| 1 | """ |
|---|
| 2 | HTTP client that talks to the HTTP storage server. |
|---|
| 3 | """ |
|---|
| 4 | |
|---|
| 5 | from __future__ import annotations |
|---|
| 6 | |
|---|
| 7 | |
|---|
| 8 | from typing import ( |
|---|
| 9 | Optional, |
|---|
| 10 | Sequence, |
|---|
| 11 | Mapping, |
|---|
| 12 | BinaryIO, |
|---|
| 13 | cast, |
|---|
| 14 | TypedDict, |
|---|
| 15 | Set, |
|---|
| 16 | Dict, |
|---|
| 17 | Callable, |
|---|
| 18 | ClassVar, |
|---|
| 19 | ) |
|---|
| 20 | from base64 import b64encode |
|---|
| 21 | from io import BytesIO |
|---|
| 22 | from os import SEEK_END |
|---|
| 23 | |
|---|
| 24 | from attrs import define, asdict, frozen, field |
|---|
| 25 | from eliot import start_action, register_exception_extractor |
|---|
| 26 | from eliot.twisted import DeferredContext |
|---|
| 27 | |
|---|
| 28 | from pycddl import Schema |
|---|
| 29 | from collections_extended import RangeMap |
|---|
| 30 | from werkzeug.datastructures import Range, ContentRange |
|---|
| 31 | from twisted.web.http_headers import Headers |
|---|
| 32 | from twisted.web import http |
|---|
| 33 | from twisted.web.iweb import IPolicyForHTTPS, IResponse, IAgent |
|---|
| 34 | from twisted.internet.defer import Deferred, succeed |
|---|
| 35 | from twisted.internet.interfaces import ( |
|---|
| 36 | IOpenSSLClientConnectionCreator, |
|---|
| 37 | IReactorTime, |
|---|
| 38 | IDelayedCall, |
|---|
| 39 | ) |
|---|
| 40 | from twisted.internet.ssl import CertificateOptions |
|---|
| 41 | from twisted.protocols.tls import TLSMemoryBIOProtocol |
|---|
| 42 | from twisted.web.client import Agent, HTTPConnectionPool |
|---|
| 43 | from zope.interface import implementer |
|---|
| 44 | from hyperlink import DecodedURL |
|---|
| 45 | import treq |
|---|
| 46 | from treq.client import HTTPClient |
|---|
| 47 | from OpenSSL import SSL |
|---|
| 48 | from werkzeug.http import parse_content_range_header |
|---|
| 49 | |
|---|
| 50 | from .http_common import ( |
|---|
| 51 | swissnum_auth_header, |
|---|
| 52 | Secrets, |
|---|
| 53 | get_content_type, |
|---|
| 54 | CBOR_MIME_TYPE, |
|---|
| 55 | get_spki_hash, |
|---|
| 56 | response_is_not_html, |
|---|
| 57 | ) |
|---|
| 58 | from ..interfaces import VersionMessage |
|---|
| 59 | from .common import si_b2a, si_to_human_readable |
|---|
| 60 | from ..util.hashutil import timing_safe_compare |
|---|
| 61 | from ..util.deferredutil import async_to_deferred |
|---|
| 62 | from ..util.tor_provider import _Provider as TorProvider |
|---|
| 63 | from ..util.cputhreadpool import defer_to_thread |
|---|
| 64 | from ..util.cbor import dumps |
|---|
| 65 | |
|---|
| 66 | try: |
|---|
| 67 | from txtorcon import Tor # type: ignore |
|---|
| 68 | except ImportError: |
|---|
| 69 | |
|---|
| 70 | class Tor: # type: ignore[no-redef] |
|---|
| 71 | pass |
|---|
| 72 | |
|---|
| 73 | |
|---|
| 74 | def _encode_si(si: bytes) -> str: |
|---|
| 75 | """Encode the storage index into Unicode string.""" |
|---|
| 76 | return str(si_b2a(si), "ascii") |
|---|
| 77 | |
|---|
| 78 | |
|---|
| 79 | class ClientException(Exception): |
|---|
| 80 | """An unexpected response code from the server.""" |
|---|
| 81 | |
|---|
| 82 | def __init__( |
|---|
| 83 | self, code: int, message: Optional[str] = None, body: Optional[bytes] = None |
|---|
| 84 | ): |
|---|
| 85 | Exception.__init__(self, code, message, body) |
|---|
| 86 | self.code = code |
|---|
| 87 | self.message = message |
|---|
| 88 | self.body = body |
|---|
| 89 | |
|---|
| 90 | |
|---|
| 91 | register_exception_extractor(ClientException, lambda e: {"response_code": e.code}) |
|---|
| 92 | |
|---|
| 93 | |
|---|
| 94 | # Schemas for server responses. |
|---|
| 95 | # |
|---|
| 96 | # Tags are of the form #6.nnn, where the number is documented at |
|---|
| 97 | # https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml. Notably, #6.258 |
|---|
| 98 | # indicates a set. |
|---|
| 99 | _SCHEMAS: Mapping[str, Schema] = { |
|---|
| 100 | "get_version": Schema( |
|---|
| 101 | # Note that the single-quoted (`'`) string keys in this schema |
|---|
| 102 | # represent *byte* strings - per the CDDL specification. Text strings |
|---|
| 103 | # are represented using strings with *double* quotes (`"`). |
|---|
| 104 | """ |
|---|
| 105 | response = {'http://allmydata.org/tahoe/protocols/storage/v1' => { |
|---|
| 106 | 'maximum-immutable-share-size' => uint |
|---|
| 107 | 'maximum-mutable-share-size' => uint |
|---|
| 108 | 'available-space' => uint |
|---|
| 109 | } |
|---|
| 110 | 'application-version' => bstr |
|---|
| 111 | } |
|---|
| 112 | """ |
|---|
| 113 | ), |
|---|
| 114 | "allocate_buckets": Schema( |
|---|
| 115 | """ |
|---|
| 116 | response = { |
|---|
| 117 | already-have: #6.258([0*256 uint]) |
|---|
| 118 | allocated: #6.258([0*256 uint]) |
|---|
| 119 | } |
|---|
| 120 | """ |
|---|
| 121 | ), |
|---|
| 122 | "immutable_write_share_chunk": Schema( |
|---|
| 123 | """ |
|---|
| 124 | response = { |
|---|
| 125 | required: [0* {begin: uint, end: uint}] |
|---|
| 126 | } |
|---|
| 127 | """ |
|---|
| 128 | ), |
|---|
| 129 | "list_shares": Schema( |
|---|
| 130 | """ |
|---|
| 131 | response = #6.258([0*256 uint]) |
|---|
| 132 | """ |
|---|
| 133 | ), |
|---|
| 134 | "mutable_read_test_write": Schema( |
|---|
| 135 | """ |
|---|
| 136 | response = { |
|---|
| 137 | "success": bool, |
|---|
| 138 | "data": {0*256 share_number: [0* bstr]} |
|---|
| 139 | } |
|---|
| 140 | share_number = uint |
|---|
| 141 | """ |
|---|
| 142 | ), |
|---|
| 143 | "mutable_list_shares": Schema( |
|---|
| 144 | """ |
|---|
| 145 | response = #6.258([0*256 uint]) |
|---|
| 146 | """ |
|---|
| 147 | ), |
|---|
| 148 | } |
|---|
| 149 | |
|---|
| 150 | |
|---|
| 151 | @define |
|---|
| 152 | class _LengthLimitedCollector: |
|---|
| 153 | """ |
|---|
| 154 | Collect data using ``treq.collect()``, with limited length. |
|---|
| 155 | """ |
|---|
| 156 | |
|---|
| 157 | remaining_length: int |
|---|
| 158 | timeout_on_silence: IDelayedCall |
|---|
| 159 | f: BytesIO = field(factory=BytesIO) |
|---|
| 160 | |
|---|
| 161 | def __call__(self, data: bytes) -> None: |
|---|
| 162 | self.timeout_on_silence.reset(60) |
|---|
| 163 | self.remaining_length -= len(data) |
|---|
| 164 | if self.remaining_length < 0: |
|---|
| 165 | raise ValueError("Response length was too long") |
|---|
| 166 | self.f.write(data) |
|---|
| 167 | |
|---|
| 168 | |
|---|
| 169 | def limited_content( |
|---|
| 170 | response: IResponse, |
|---|
| 171 | clock: IReactorTime, |
|---|
| 172 | max_length: int = 30 * 1024 * 1024, |
|---|
| 173 | ) -> Deferred[BinaryIO]: |
|---|
| 174 | """ |
|---|
| 175 | Like ``treq.content()``, but limit data read from the response to a set |
|---|
| 176 | length. If the response is longer than the max allowed length, the result |
|---|
| 177 | fails with a ``ValueError``. |
|---|
| 178 | |
|---|
| 179 | A potentially useful future improvement would be using a temporary file to |
|---|
| 180 | store the content; since filesystem buffering means that would use memory |
|---|
| 181 | for small responses and disk for large responses. |
|---|
| 182 | |
|---|
| 183 | This will time out if no data is received for 60 seconds; so long as a |
|---|
| 184 | trickle of data continues to arrive, it will continue to run. |
|---|
| 185 | """ |
|---|
| 186 | result_deferred = succeed(None) |
|---|
| 187 | |
|---|
| 188 | # Sadly, addTimeout() won't work because we need access to the IDelayedCall |
|---|
| 189 | # in order to reset it on each data chunk received. |
|---|
| 190 | timeout = clock.callLater(60, result_deferred.cancel) |
|---|
| 191 | collector = _LengthLimitedCollector(max_length, timeout) |
|---|
| 192 | |
|---|
| 193 | with start_action( |
|---|
| 194 | action_type="allmydata:storage:http-client:limited-content", |
|---|
| 195 | max_length=max_length, |
|---|
| 196 | ).context(): |
|---|
| 197 | d = DeferredContext(result_deferred) |
|---|
| 198 | |
|---|
| 199 | # Make really sure everything gets called in Deferred context, treq might |
|---|
| 200 | # call collector directly... |
|---|
| 201 | d.addCallback(lambda _: treq.collect(response, collector)) |
|---|
| 202 | |
|---|
| 203 | def done(_: object) -> BytesIO: |
|---|
| 204 | timeout.cancel() |
|---|
| 205 | collector.f.seek(0) |
|---|
| 206 | return collector.f |
|---|
| 207 | |
|---|
| 208 | def failed(f): |
|---|
| 209 | if timeout.active(): |
|---|
| 210 | timeout.cancel() |
|---|
| 211 | return f |
|---|
| 212 | |
|---|
| 213 | result = d.addCallbacks(done, failed) |
|---|
| 214 | return result.addActionFinish() |
|---|
| 215 | |
|---|
| 216 | |
|---|
| 217 | @define |
|---|
| 218 | class ImmutableCreateResult: |
|---|
| 219 | """Result of creating a storage index for an immutable.""" |
|---|
| 220 | |
|---|
| 221 | already_have: set[int] |
|---|
| 222 | allocated: set[int] |
|---|
| 223 | |
|---|
| 224 | |
|---|
| 225 | class _TLSContextFactory(CertificateOptions): |
|---|
| 226 | """ |
|---|
| 227 | Create a context that validates the way Tahoe-LAFS wants to: based on a |
|---|
| 228 | pinned certificate hash, rather than a certificate authority. |
|---|
| 229 | |
|---|
| 230 | Originally implemented as part of Foolscap. To comply with the license, |
|---|
| 231 | here's the original licensing terms: |
|---|
| 232 | |
|---|
| 233 | Copyright (c) 2006-2008 Brian Warner |
|---|
| 234 | |
|---|
| 235 | Permission is hereby granted, free of charge, to any person obtaining a |
|---|
| 236 | copy of this software and associated documentation files (the "Software"), |
|---|
| 237 | to deal in the Software without restriction, including without limitation |
|---|
| 238 | the rights to use, copy, modify, merge, publish, distribute, sublicense, |
|---|
| 239 | and/or sell copies of the Software, and to permit persons to whom the |
|---|
| 240 | Software is furnished to do so, subject to the following conditions: |
|---|
| 241 | |
|---|
| 242 | The above copyright notice and this permission notice shall be included in |
|---|
| 243 | all copies or substantial portions of the Software. |
|---|
| 244 | |
|---|
| 245 | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|---|
| 246 | IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
|---|
| 247 | FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL |
|---|
| 248 | THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
|---|
| 249 | LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
|---|
| 250 | FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER |
|---|
| 251 | DEALINGS IN THE SOFTWARE. |
|---|
| 252 | """ |
|---|
| 253 | |
|---|
| 254 | def __init__(self, expected_spki_hash: bytes): |
|---|
| 255 | self.expected_spki_hash = expected_spki_hash |
|---|
| 256 | CertificateOptions.__init__(self) |
|---|
| 257 | |
|---|
| 258 | def getContext(self) -> SSL.Context: |
|---|
| 259 | def always_validate(conn, cert, errno, depth, preverify_ok): |
|---|
| 260 | # This function is called to validate the certificate received by |
|---|
| 261 | # the other end. OpenSSL calls it multiple times, for each errno |
|---|
| 262 | # for each certificate. |
|---|
| 263 | |
|---|
| 264 | # We do not care about certificate authorities or revocation |
|---|
| 265 | # lists, we just want to know that the certificate has a valid |
|---|
| 266 | # signature and follow the chain back to one which is |
|---|
| 267 | # self-signed. We need to protect against forged signatures, but |
|---|
| 268 | # not the usual TLS concerns about invalid CAs or revoked |
|---|
| 269 | # certificates. |
|---|
| 270 | things_are_ok = ( |
|---|
| 271 | SSL.X509VerificationCodes.OK, |
|---|
| 272 | SSL.X509VerificationCodes.ERR_CERT_NOT_YET_VALID, |
|---|
| 273 | SSL.X509VerificationCodes.ERR_CERT_HAS_EXPIRED, |
|---|
| 274 | SSL.X509VerificationCodes.ERR_DEPTH_ZERO_SELF_SIGNED_CERT, |
|---|
| 275 | SSL.X509VerificationCodes.ERR_SELF_SIGNED_CERT_IN_CHAIN, |
|---|
| 276 | ) |
|---|
| 277 | # TODO can we do this once instead of multiple times? |
|---|
| 278 | if errno in things_are_ok and timing_safe_compare( |
|---|
| 279 | get_spki_hash(cert.to_cryptography()), self.expected_spki_hash |
|---|
| 280 | ): |
|---|
| 281 | return 1 |
|---|
| 282 | # TODO: log the details of the error, because otherwise they get |
|---|
| 283 | # lost in the PyOpenSSL exception that will eventually be raised |
|---|
| 284 | # (possibly OpenSSL.SSL.Error: certificate verify failed) |
|---|
| 285 | return 0 |
|---|
| 286 | |
|---|
| 287 | ctx = CertificateOptions.getContext(self) |
|---|
| 288 | |
|---|
| 289 | # VERIFY_PEER means we ask the the other end for their certificate. |
|---|
| 290 | ctx.set_verify(SSL.VERIFY_PEER, always_validate) |
|---|
| 291 | return ctx |
|---|
| 292 | |
|---|
| 293 | |
|---|
| 294 | @implementer(IPolicyForHTTPS) |
|---|
| 295 | @implementer(IOpenSSLClientConnectionCreator) |
|---|
| 296 | @define |
|---|
| 297 | class _StorageClientHTTPSPolicy: |
|---|
| 298 | """ |
|---|
| 299 | A HTTPS policy that ensures the SPKI hash of the public key matches a known |
|---|
| 300 | hash, i.e. pinning-based validation. |
|---|
| 301 | """ |
|---|
| 302 | |
|---|
| 303 | expected_spki_hash: bytes |
|---|
| 304 | |
|---|
| 305 | # IPolicyForHTTPS |
|---|
| 306 | def creatorForNetloc(self, hostname: str, port: int) -> _StorageClientHTTPSPolicy: |
|---|
| 307 | return self |
|---|
| 308 | |
|---|
| 309 | # IOpenSSLClientConnectionCreator |
|---|
| 310 | def clientConnectionForTLS( |
|---|
| 311 | self, tlsProtocol: TLSMemoryBIOProtocol |
|---|
| 312 | ) -> SSL.Connection: |
|---|
| 313 | return SSL.Connection( |
|---|
| 314 | _TLSContextFactory(self.expected_spki_hash).getContext(), None |
|---|
| 315 | ) |
|---|
| 316 | |
|---|
| 317 | |
|---|
| 318 | @define |
|---|
| 319 | class StorageClientFactory: |
|---|
| 320 | """ |
|---|
| 321 | Create ``StorageClient`` instances, using appropriate |
|---|
| 322 | ``twisted.web.iweb.IAgent`` for different connection methods: normal TCP, |
|---|
| 323 | Tor, and eventually I2P. |
|---|
| 324 | |
|---|
| 325 | There is some caching involved since there might be shared setup work, e.g. |
|---|
| 326 | connecting to the local Tor service only needs to happen once. |
|---|
| 327 | """ |
|---|
| 328 | |
|---|
| 329 | _default_connection_handlers: dict[str, str] |
|---|
| 330 | _tor_provider: Optional[TorProvider] |
|---|
| 331 | # Cache the Tor instance created by the provider, if relevant. |
|---|
| 332 | _tor_instance: Optional[Tor] = None |
|---|
| 333 | |
|---|
| 334 | # If set, we're doing unit testing and we should call this with any |
|---|
| 335 | # HTTPConnectionPool that gets passed/created to ``create_agent()``. |
|---|
| 336 | TEST_MODE_REGISTER_HTTP_POOL: ClassVar[ |
|---|
| 337 | Optional[Callable[[HTTPConnectionPool], None]] |
|---|
| 338 | ] = None |
|---|
| 339 | |
|---|
| 340 | @classmethod |
|---|
| 341 | def start_test_mode(cls, callback: Callable[[HTTPConnectionPool], None]) -> None: |
|---|
| 342 | """Switch to testing mode. |
|---|
| 343 | |
|---|
| 344 | In testing mode we register the pool with test system using the given |
|---|
| 345 | callback so it can Do Things, most notably killing off idle HTTP |
|---|
| 346 | connections at test shutdown and, in some tests, in the midddle of the |
|---|
| 347 | test. |
|---|
| 348 | """ |
|---|
| 349 | cls.TEST_MODE_REGISTER_HTTP_POOL = callback |
|---|
| 350 | |
|---|
| 351 | @classmethod |
|---|
| 352 | def stop_test_mode(cls) -> None: |
|---|
| 353 | """Stop testing mode.""" |
|---|
| 354 | cls.TEST_MODE_REGISTER_HTTP_POOL = None |
|---|
| 355 | |
|---|
| 356 | async def _create_agent( |
|---|
| 357 | self, |
|---|
| 358 | nurl: DecodedURL, |
|---|
| 359 | reactor: object, |
|---|
| 360 | tls_context_factory: IPolicyForHTTPS, |
|---|
| 361 | pool: HTTPConnectionPool, |
|---|
| 362 | ) -> IAgent: |
|---|
| 363 | """Create a new ``IAgent``, possibly using Tor.""" |
|---|
| 364 | if self.TEST_MODE_REGISTER_HTTP_POOL is not None: |
|---|
| 365 | self.TEST_MODE_REGISTER_HTTP_POOL(pool) |
|---|
| 366 | |
|---|
| 367 | # TODO default_connection_handlers should really be an object, not a |
|---|
| 368 | # dict, so we can ask "is this using Tor" without poking at a |
|---|
| 369 | # dictionary with arbitrary strings... See |
|---|
| 370 | # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/4032 |
|---|
| 371 | handler = self._default_connection_handlers["tcp"] |
|---|
| 372 | |
|---|
| 373 | if handler == "tcp": |
|---|
| 374 | return Agent(reactor, tls_context_factory, pool=pool) |
|---|
| 375 | if handler == "tor" or nurl.scheme == "pb+tor": |
|---|
| 376 | assert self._tor_provider is not None |
|---|
| 377 | if self._tor_instance is None: |
|---|
| 378 | self._tor_instance = await self._tor_provider.get_tor_instance(reactor) |
|---|
| 379 | return self._tor_instance.web_agent( |
|---|
| 380 | pool=pool, tls_context_factory=tls_context_factory |
|---|
| 381 | ) |
|---|
| 382 | else: |
|---|
| 383 | # I2P support will be added here. See |
|---|
| 384 | # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/4037 |
|---|
| 385 | raise RuntimeError(f"Unsupported tcp connection handler: {handler}") |
|---|
| 386 | |
|---|
| 387 | async def create_storage_client( |
|---|
| 388 | self, |
|---|
| 389 | nurl: DecodedURL, |
|---|
| 390 | reactor: IReactorTime, |
|---|
| 391 | pool: Optional[HTTPConnectionPool] = None, |
|---|
| 392 | ) -> StorageClient: |
|---|
| 393 | """Create a new ``StorageClient`` for the given NURL.""" |
|---|
| 394 | assert nurl.fragment == "v=1" |
|---|
| 395 | assert nurl.scheme in ("pb", "pb+tor") |
|---|
| 396 | if pool is None: |
|---|
| 397 | pool = HTTPConnectionPool(reactor) |
|---|
| 398 | pool.maxPersistentPerHost = 10 |
|---|
| 399 | |
|---|
| 400 | certificate_hash = nurl.user.encode("ascii") |
|---|
| 401 | agent = await self._create_agent( |
|---|
| 402 | nurl, |
|---|
| 403 | reactor, |
|---|
| 404 | _StorageClientHTTPSPolicy(expected_spki_hash=certificate_hash), |
|---|
| 405 | pool, |
|---|
| 406 | ) |
|---|
| 407 | treq_client = HTTPClient(agent) |
|---|
| 408 | https_url = DecodedURL().replace(scheme="https", host=nurl.host, port=nurl.port) |
|---|
| 409 | swissnum = nurl.path[0].encode("ascii") |
|---|
| 410 | response_check = lambda _: None |
|---|
| 411 | if self.TEST_MODE_REGISTER_HTTP_POOL is not None: |
|---|
| 412 | response_check = response_is_not_html |
|---|
| 413 | |
|---|
| 414 | return StorageClient( |
|---|
| 415 | https_url, |
|---|
| 416 | swissnum, |
|---|
| 417 | treq_client, |
|---|
| 418 | pool, |
|---|
| 419 | reactor, |
|---|
| 420 | response_check, |
|---|
| 421 | ) |
|---|
| 422 | |
|---|
| 423 | |
|---|
| 424 | @define(hash=True) |
|---|
| 425 | class StorageClient: |
|---|
| 426 | """ |
|---|
| 427 | Low-level HTTP client that talks to the HTTP storage server. |
|---|
| 428 | |
|---|
| 429 | Create using a ``StorageClientFactory`` instance. |
|---|
| 430 | """ |
|---|
| 431 | |
|---|
| 432 | # The URL should be a HTTPS URL ("https://...") |
|---|
| 433 | _base_url: DecodedURL |
|---|
| 434 | _swissnum: bytes |
|---|
| 435 | _treq: HTTPClient |
|---|
| 436 | _pool: HTTPConnectionPool |
|---|
| 437 | _clock: IReactorTime |
|---|
| 438 | # Are we running unit tests? |
|---|
| 439 | _analyze_response: Callable[[IResponse], None] = lambda _: None |
|---|
| 440 | |
|---|
| 441 | def relative_url(self, path: str) -> DecodedURL: |
|---|
| 442 | """Get a URL relative to the base URL.""" |
|---|
| 443 | return self._base_url.click(path) |
|---|
| 444 | |
|---|
| 445 | def _get_headers(self, headers: Optional[Headers]) -> Headers: |
|---|
| 446 | """Return the basic headers to be used by default.""" |
|---|
| 447 | if headers is None: |
|---|
| 448 | headers = Headers() |
|---|
| 449 | headers.addRawHeader( |
|---|
| 450 | "Authorization", |
|---|
| 451 | swissnum_auth_header(self._swissnum), |
|---|
| 452 | ) |
|---|
| 453 | return headers |
|---|
| 454 | |
|---|
| 455 | @async_to_deferred |
|---|
| 456 | async def request( |
|---|
| 457 | self, |
|---|
| 458 | method: str, |
|---|
| 459 | url: DecodedURL, |
|---|
| 460 | lease_renew_secret: Optional[bytes] = None, |
|---|
| 461 | lease_cancel_secret: Optional[bytes] = None, |
|---|
| 462 | upload_secret: Optional[bytes] = None, |
|---|
| 463 | write_enabler_secret: Optional[bytes] = None, |
|---|
| 464 | headers: Optional[Headers] = None, |
|---|
| 465 | message_to_serialize: object = None, |
|---|
| 466 | timeout: float = 60, |
|---|
| 467 | **kwargs, |
|---|
| 468 | ) -> IResponse: |
|---|
| 469 | """ |
|---|
| 470 | Like ``treq.request()``, but with optional secrets that get translated |
|---|
| 471 | into corresponding HTTP headers. |
|---|
| 472 | |
|---|
| 473 | If ``message_to_serialize`` is set, it will be serialized (by default |
|---|
| 474 | with CBOR) and set as the request body. It should not be mutated |
|---|
| 475 | during execution of this function! |
|---|
| 476 | |
|---|
| 477 | Default timeout is 60 seconds. |
|---|
| 478 | """ |
|---|
| 479 | with start_action( |
|---|
| 480 | action_type="allmydata:storage:http-client:request", |
|---|
| 481 | method=method, |
|---|
| 482 | url=url.to_text(), |
|---|
| 483 | timeout=timeout, |
|---|
| 484 | ) as ctx: |
|---|
| 485 | response = await self._request( |
|---|
| 486 | method, |
|---|
| 487 | url, |
|---|
| 488 | lease_renew_secret, |
|---|
| 489 | lease_cancel_secret, |
|---|
| 490 | upload_secret, |
|---|
| 491 | write_enabler_secret, |
|---|
| 492 | headers, |
|---|
| 493 | message_to_serialize, |
|---|
| 494 | timeout, |
|---|
| 495 | **kwargs, |
|---|
| 496 | ) |
|---|
| 497 | ctx.add_success_fields(response_code=response.code) |
|---|
| 498 | return response |
|---|
| 499 | |
|---|
| 500 | async def _request( |
|---|
| 501 | self, |
|---|
| 502 | method: str, |
|---|
| 503 | url: DecodedURL, |
|---|
| 504 | lease_renew_secret: Optional[bytes] = None, |
|---|
| 505 | lease_cancel_secret: Optional[bytes] = None, |
|---|
| 506 | upload_secret: Optional[bytes] = None, |
|---|
| 507 | write_enabler_secret: Optional[bytes] = None, |
|---|
| 508 | headers: Optional[Headers] = None, |
|---|
| 509 | message_to_serialize: object = None, |
|---|
| 510 | timeout: float = 60, |
|---|
| 511 | **kwargs, |
|---|
| 512 | ) -> IResponse: |
|---|
| 513 | """The implementation of request().""" |
|---|
| 514 | headers = self._get_headers(headers) |
|---|
| 515 | |
|---|
| 516 | # Add secrets: |
|---|
| 517 | for secret, value in [ |
|---|
| 518 | (Secrets.LEASE_RENEW, lease_renew_secret), |
|---|
| 519 | (Secrets.LEASE_CANCEL, lease_cancel_secret), |
|---|
| 520 | (Secrets.UPLOAD, upload_secret), |
|---|
| 521 | (Secrets.WRITE_ENABLER, write_enabler_secret), |
|---|
| 522 | ]: |
|---|
| 523 | if value is None: |
|---|
| 524 | continue |
|---|
| 525 | headers.addRawHeader( |
|---|
| 526 | "X-Tahoe-Authorization", |
|---|
| 527 | b"%s %s" % (secret.value.encode("ascii"), b64encode(value).strip()), |
|---|
| 528 | ) |
|---|
| 529 | |
|---|
| 530 | # Note we can accept CBOR: |
|---|
| 531 | headers.addRawHeader("Accept", CBOR_MIME_TYPE) |
|---|
| 532 | |
|---|
| 533 | # If there's a request message, serialize it and set the Content-Type |
|---|
| 534 | # header: |
|---|
| 535 | if message_to_serialize is not None: |
|---|
| 536 | if "data" in kwargs: |
|---|
| 537 | raise TypeError( |
|---|
| 538 | "Can't use both `message_to_serialize` and `data` " |
|---|
| 539 | "as keyword arguments at the same time" |
|---|
| 540 | ) |
|---|
| 541 | kwargs["data"] = await defer_to_thread(dumps, message_to_serialize) |
|---|
| 542 | headers.addRawHeader("Content-Type", CBOR_MIME_TYPE) |
|---|
| 543 | |
|---|
| 544 | response = await self._treq.request( |
|---|
| 545 | method, url, headers=headers, timeout=timeout, **kwargs |
|---|
| 546 | ) |
|---|
| 547 | self._analyze_response(response) |
|---|
| 548 | |
|---|
| 549 | return response |
|---|
| 550 | |
|---|
| 551 | async def decode_cbor(self, response: IResponse, schema: Schema) -> object: |
|---|
| 552 | """Given HTTP response, return decoded CBOR body.""" |
|---|
| 553 | with start_action(action_type="allmydata:storage:http-client:decode-cbor"): |
|---|
| 554 | if response.code > 199 and response.code < 300: |
|---|
| 555 | content_type = get_content_type(response.headers) |
|---|
| 556 | if content_type == CBOR_MIME_TYPE: |
|---|
| 557 | f = await limited_content(response, self._clock) |
|---|
| 558 | data = f.read() |
|---|
| 559 | |
|---|
| 560 | def validate_and_decode(): |
|---|
| 561 | return schema.validate_cbor(data, True) |
|---|
| 562 | |
|---|
| 563 | return await defer_to_thread(validate_and_decode) |
|---|
| 564 | else: |
|---|
| 565 | raise ClientException( |
|---|
| 566 | -1, |
|---|
| 567 | "Server didn't send CBOR, content type is {}".format( |
|---|
| 568 | content_type |
|---|
| 569 | ), |
|---|
| 570 | ) |
|---|
| 571 | else: |
|---|
| 572 | data = ( |
|---|
| 573 | await limited_content(response, self._clock, max_length=10_000) |
|---|
| 574 | ).read() |
|---|
| 575 | raise ClientException(response.code, response.phrase, data) |
|---|
| 576 | |
|---|
| 577 | def shutdown(self) -> Deferred[object]: |
|---|
| 578 | """Shutdown any connections.""" |
|---|
| 579 | return self._pool.closeCachedConnections() |
|---|
| 580 | |
|---|
| 581 | |
|---|
| 582 | @define(hash=True) |
|---|
| 583 | class StorageClientGeneral: |
|---|
| 584 | """ |
|---|
| 585 | High-level HTTP APIs that aren't immutable- or mutable-specific. |
|---|
| 586 | """ |
|---|
| 587 | |
|---|
| 588 | _client: StorageClient |
|---|
| 589 | |
|---|
| 590 | @async_to_deferred |
|---|
| 591 | async def get_version(self) -> VersionMessage: |
|---|
| 592 | """ |
|---|
| 593 | Return the version metadata for the server. |
|---|
| 594 | """ |
|---|
| 595 | with start_action( |
|---|
| 596 | action_type="allmydata:storage:http-client:get-version", |
|---|
| 597 | ): |
|---|
| 598 | return await self._get_version() |
|---|
| 599 | |
|---|
| 600 | async def _get_version(self) -> VersionMessage: |
|---|
| 601 | """Implementation of get_version().""" |
|---|
| 602 | url = self._client.relative_url("/storage/v1/version") |
|---|
| 603 | response = await self._client.request("GET", url) |
|---|
| 604 | decoded_response = cast( |
|---|
| 605 | Dict[bytes, object], |
|---|
| 606 | await self._client.decode_cbor(response, _SCHEMAS["get_version"]), |
|---|
| 607 | ) |
|---|
| 608 | # Add some features we know are true because the HTTP API |
|---|
| 609 | # specification requires them and because other parts of the storage |
|---|
| 610 | # client implementation assumes they will be present. |
|---|
| 611 | cast( |
|---|
| 612 | Dict[bytes, object], |
|---|
| 613 | decoded_response[b"http://allmydata.org/tahoe/protocols/storage/v1"], |
|---|
| 614 | ).update( |
|---|
| 615 | { |
|---|
| 616 | b"tolerates-immutable-read-overrun": True, |
|---|
| 617 | b"delete-mutable-shares-with-zero-length-writev": True, |
|---|
| 618 | b"fills-holes-with-zero-bytes": True, |
|---|
| 619 | b"prevents-read-past-end-of-share-data": True, |
|---|
| 620 | } |
|---|
| 621 | ) |
|---|
| 622 | return decoded_response |
|---|
| 623 | |
|---|
| 624 | @async_to_deferred |
|---|
| 625 | async def add_or_renew_lease( |
|---|
| 626 | self, storage_index: bytes, renew_secret: bytes, cancel_secret: bytes |
|---|
| 627 | ) -> None: |
|---|
| 628 | """ |
|---|
| 629 | Add or renew a lease. |
|---|
| 630 | |
|---|
| 631 | If the renewal secret matches an existing lease, it is renewed. |
|---|
| 632 | Otherwise a new lease is added. |
|---|
| 633 | """ |
|---|
| 634 | with start_action( |
|---|
| 635 | action_type="allmydata:storage:http-client:add-or-renew-lease", |
|---|
| 636 | storage_index=si_to_human_readable(storage_index), |
|---|
| 637 | ): |
|---|
| 638 | return await self._add_or_renew_lease( |
|---|
| 639 | storage_index, renew_secret, cancel_secret |
|---|
| 640 | ) |
|---|
| 641 | |
|---|
| 642 | async def _add_or_renew_lease( |
|---|
| 643 | self, storage_index: bytes, renew_secret: bytes, cancel_secret: bytes |
|---|
| 644 | ) -> None: |
|---|
| 645 | url = self._client.relative_url( |
|---|
| 646 | "/storage/v1/lease/{}".format(_encode_si(storage_index)) |
|---|
| 647 | ) |
|---|
| 648 | response = await self._client.request( |
|---|
| 649 | "PUT", |
|---|
| 650 | url, |
|---|
| 651 | lease_renew_secret=renew_secret, |
|---|
| 652 | lease_cancel_secret=cancel_secret, |
|---|
| 653 | ) |
|---|
| 654 | |
|---|
| 655 | if response.code == http.NO_CONTENT: |
|---|
| 656 | return |
|---|
| 657 | else: |
|---|
| 658 | raise ClientException(response.code) |
|---|
| 659 | |
|---|
| 660 | |
|---|
| 661 | @define |
|---|
| 662 | class UploadProgress: |
|---|
| 663 | """ |
|---|
| 664 | Progress of immutable upload, per the server. |
|---|
| 665 | """ |
|---|
| 666 | |
|---|
| 667 | # True when upload has finished. |
|---|
| 668 | finished: bool |
|---|
| 669 | # Remaining ranges to upload. |
|---|
| 670 | required: RangeMap |
|---|
| 671 | |
|---|
| 672 | |
|---|
| 673 | @async_to_deferred |
|---|
| 674 | async def read_share_chunk( |
|---|
| 675 | client: StorageClient, |
|---|
| 676 | share_type: str, |
|---|
| 677 | storage_index: bytes, |
|---|
| 678 | share_number: int, |
|---|
| 679 | offset: int, |
|---|
| 680 | length: int, |
|---|
| 681 | ) -> bytes: |
|---|
| 682 | """ |
|---|
| 683 | Download a chunk of data from a share. |
|---|
| 684 | |
|---|
| 685 | TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed downloads |
|---|
| 686 | should be transparently retried and redownloaded by the implementation a |
|---|
| 687 | few times so that if a failure percolates up, the caller can assume the |
|---|
| 688 | failure isn't a short-term blip. |
|---|
| 689 | |
|---|
| 690 | NOTE: the underlying HTTP protocol is somewhat more flexible than this API, |
|---|
| 691 | insofar as it doesn't always require a range. In practice a range is |
|---|
| 692 | always provided by the current callers. |
|---|
| 693 | """ |
|---|
| 694 | url = client.relative_url( |
|---|
| 695 | "/storage/v1/{}/{}/{}".format( |
|---|
| 696 | share_type, _encode_si(storage_index), share_number |
|---|
| 697 | ) |
|---|
| 698 | ) |
|---|
| 699 | # The default 60 second timeout is for getting the response, so it doesn't |
|---|
| 700 | # include the time it takes to download the body... so we will will deal |
|---|
| 701 | # with that later, via limited_content(). |
|---|
| 702 | response = await client.request( |
|---|
| 703 | "GET", |
|---|
| 704 | url, |
|---|
| 705 | headers=Headers( |
|---|
| 706 | # Ranges in HTTP are _inclusive_, Python's convention is exclusive, |
|---|
| 707 | # but Range constructor does that the conversion for us. |
|---|
| 708 | {"range": [Range("bytes", [(offset, offset + length)]).to_header()]} |
|---|
| 709 | ), |
|---|
| 710 | unbuffered=True, # Don't buffer the response in memory. |
|---|
| 711 | ) |
|---|
| 712 | |
|---|
| 713 | if response.code == http.NO_CONTENT: |
|---|
| 714 | return b"" |
|---|
| 715 | |
|---|
| 716 | content_type = get_content_type(response.headers) |
|---|
| 717 | if content_type != "application/octet-stream": |
|---|
| 718 | raise ValueError( |
|---|
| 719 | f"Content-type was wrong: {content_type}, should be application/octet-stream" |
|---|
| 720 | ) |
|---|
| 721 | |
|---|
| 722 | if response.code == http.PARTIAL_CONTENT: |
|---|
| 723 | content_range = parse_content_range_header( |
|---|
| 724 | response.headers.getRawHeaders("content-range")[0] or "" |
|---|
| 725 | ) |
|---|
| 726 | if ( |
|---|
| 727 | content_range is None |
|---|
| 728 | or content_range.stop is None |
|---|
| 729 | or content_range.start is None |
|---|
| 730 | ): |
|---|
| 731 | raise ValueError( |
|---|
| 732 | "Content-Range was missing, invalid, or in format we don't support" |
|---|
| 733 | ) |
|---|
| 734 | supposed_length = content_range.stop - content_range.start |
|---|
| 735 | if supposed_length > length: |
|---|
| 736 | raise ValueError("Server sent more than we asked for?!") |
|---|
| 737 | # It might also send less than we asked for. That's (probably) OK, e.g. |
|---|
| 738 | # if we went past the end of the file. |
|---|
| 739 | body = await limited_content(response, client._clock, supposed_length) |
|---|
| 740 | body.seek(0, SEEK_END) |
|---|
| 741 | actual_length = body.tell() |
|---|
| 742 | if actual_length != supposed_length: |
|---|
| 743 | # Most likely a mutable that got changed out from under us, but |
|---|
| 744 | # conceivably could be a bug... |
|---|
| 745 | raise ValueError( |
|---|
| 746 | f"Length of response sent from server ({actual_length}) " |
|---|
| 747 | + f"didn't match Content-Range header ({supposed_length})" |
|---|
| 748 | ) |
|---|
| 749 | body.seek(0) |
|---|
| 750 | return body.read() |
|---|
| 751 | else: |
|---|
| 752 | # Technically HTTP allows sending an OK with full body under these |
|---|
| 753 | # circumstances, but the server is not designed to do that so we ignore |
|---|
| 754 | # that possibility for now... |
|---|
| 755 | raise ClientException(response.code) |
|---|
| 756 | |
|---|
| 757 | |
|---|
| 758 | @async_to_deferred |
|---|
| 759 | async def advise_corrupt_share( |
|---|
| 760 | client: StorageClient, |
|---|
| 761 | share_type: str, |
|---|
| 762 | storage_index: bytes, |
|---|
| 763 | share_number: int, |
|---|
| 764 | reason: str, |
|---|
| 765 | ) -> None: |
|---|
| 766 | assert isinstance(reason, str) |
|---|
| 767 | url = client.relative_url( |
|---|
| 768 | "/storage/v1/{}/{}/{}/corrupt".format( |
|---|
| 769 | share_type, _encode_si(storage_index), share_number |
|---|
| 770 | ) |
|---|
| 771 | ) |
|---|
| 772 | message = {"reason": reason} |
|---|
| 773 | response = await client.request("POST", url, message_to_serialize=message) |
|---|
| 774 | if response.code == http.OK: |
|---|
| 775 | return |
|---|
| 776 | else: |
|---|
| 777 | raise ClientException( |
|---|
| 778 | response.code, |
|---|
| 779 | ) |
|---|
| 780 | |
|---|
| 781 | |
|---|
| 782 | @define(hash=True) |
|---|
| 783 | class StorageClientImmutables: |
|---|
| 784 | """ |
|---|
| 785 | APIs for interacting with immutables. |
|---|
| 786 | """ |
|---|
| 787 | |
|---|
| 788 | _client: StorageClient |
|---|
| 789 | |
|---|
| 790 | @async_to_deferred |
|---|
| 791 | async def create( |
|---|
| 792 | self, |
|---|
| 793 | storage_index: bytes, |
|---|
| 794 | share_numbers: set[int], |
|---|
| 795 | allocated_size: int, |
|---|
| 796 | upload_secret: bytes, |
|---|
| 797 | lease_renew_secret: bytes, |
|---|
| 798 | lease_cancel_secret: bytes, |
|---|
| 799 | ) -> ImmutableCreateResult: |
|---|
| 800 | """ |
|---|
| 801 | Create a new storage index for an immutable. |
|---|
| 802 | |
|---|
| 803 | TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 retry |
|---|
| 804 | internally on failure, to ensure the operation fully succeeded. If |
|---|
| 805 | sufficient number of failures occurred, the result may fire with an |
|---|
| 806 | error, but there's no expectation that user code needs to have a |
|---|
| 807 | recovery codepath; it will most likely just report an error to the |
|---|
| 808 | user. |
|---|
| 809 | |
|---|
| 810 | Result fires when creating the storage index succeeded, if creating the |
|---|
| 811 | storage index failed the result will fire with an exception. |
|---|
| 812 | """ |
|---|
| 813 | with start_action( |
|---|
| 814 | action_type="allmydata:storage:http-client:immutable:create", |
|---|
| 815 | storage_index=si_to_human_readable(storage_index), |
|---|
| 816 | share_numbers=share_numbers, |
|---|
| 817 | allocated_size=allocated_size, |
|---|
| 818 | ) as ctx: |
|---|
| 819 | result = await self._create( |
|---|
| 820 | storage_index, |
|---|
| 821 | share_numbers, |
|---|
| 822 | allocated_size, |
|---|
| 823 | upload_secret, |
|---|
| 824 | lease_renew_secret, |
|---|
| 825 | lease_cancel_secret, |
|---|
| 826 | ) |
|---|
| 827 | ctx.add_success_fields( |
|---|
| 828 | already_have=result.already_have, allocated=result.allocated |
|---|
| 829 | ) |
|---|
| 830 | return result |
|---|
| 831 | |
|---|
| 832 | async def _create( |
|---|
| 833 | self, |
|---|
| 834 | storage_index: bytes, |
|---|
| 835 | share_numbers: set[int], |
|---|
| 836 | allocated_size: int, |
|---|
| 837 | upload_secret: bytes, |
|---|
| 838 | lease_renew_secret: bytes, |
|---|
| 839 | lease_cancel_secret: bytes, |
|---|
| 840 | ) -> ImmutableCreateResult: |
|---|
| 841 | """Implementation of create().""" |
|---|
| 842 | url = self._client.relative_url( |
|---|
| 843 | "/storage/v1/immutable/" + _encode_si(storage_index) |
|---|
| 844 | ) |
|---|
| 845 | message = {"share-numbers": share_numbers, "allocated-size": allocated_size} |
|---|
| 846 | |
|---|
| 847 | response = await self._client.request( |
|---|
| 848 | "POST", |
|---|
| 849 | url, |
|---|
| 850 | lease_renew_secret=lease_renew_secret, |
|---|
| 851 | lease_cancel_secret=lease_cancel_secret, |
|---|
| 852 | upload_secret=upload_secret, |
|---|
| 853 | message_to_serialize=message, |
|---|
| 854 | ) |
|---|
| 855 | decoded_response = cast( |
|---|
| 856 | Mapping[str, Set[int]], |
|---|
| 857 | await self._client.decode_cbor(response, _SCHEMAS["allocate_buckets"]), |
|---|
| 858 | ) |
|---|
| 859 | return ImmutableCreateResult( |
|---|
| 860 | already_have=decoded_response["already-have"], |
|---|
| 861 | allocated=decoded_response["allocated"], |
|---|
| 862 | ) |
|---|
| 863 | |
|---|
| 864 | @async_to_deferred |
|---|
| 865 | async def abort_upload( |
|---|
| 866 | self, storage_index: bytes, share_number: int, upload_secret: bytes |
|---|
| 867 | ) -> None: |
|---|
| 868 | """Abort the upload.""" |
|---|
| 869 | with start_action( |
|---|
| 870 | action_type="allmydata:storage:http-client:immutable:abort-upload", |
|---|
| 871 | storage_index=si_to_human_readable(storage_index), |
|---|
| 872 | share_number=share_number, |
|---|
| 873 | ): |
|---|
| 874 | return await self._abort_upload(storage_index, share_number, upload_secret) |
|---|
| 875 | |
|---|
| 876 | async def _abort_upload( |
|---|
| 877 | self, storage_index: bytes, share_number: int, upload_secret: bytes |
|---|
| 878 | ) -> None: |
|---|
| 879 | """Implementation of ``abort_upload()``.""" |
|---|
| 880 | url = self._client.relative_url( |
|---|
| 881 | "/storage/v1/immutable/{}/{}/abort".format( |
|---|
| 882 | _encode_si(storage_index), share_number |
|---|
| 883 | ) |
|---|
| 884 | ) |
|---|
| 885 | response = await self._client.request( |
|---|
| 886 | "PUT", |
|---|
| 887 | url, |
|---|
| 888 | upload_secret=upload_secret, |
|---|
| 889 | ) |
|---|
| 890 | |
|---|
| 891 | if response.code == http.OK: |
|---|
| 892 | return |
|---|
| 893 | else: |
|---|
| 894 | raise ClientException( |
|---|
| 895 | response.code, |
|---|
| 896 | ) |
|---|
| 897 | |
|---|
| 898 | @async_to_deferred |
|---|
| 899 | async def write_share_chunk( |
|---|
| 900 | self, |
|---|
| 901 | storage_index: bytes, |
|---|
| 902 | share_number: int, |
|---|
| 903 | upload_secret: bytes, |
|---|
| 904 | offset: int, |
|---|
| 905 | data: bytes, |
|---|
| 906 | ) -> UploadProgress: |
|---|
| 907 | """ |
|---|
| 908 | Upload a chunk of data for a specific share. |
|---|
| 909 | |
|---|
| 910 | TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 The |
|---|
| 911 | implementation should retry failed uploads transparently a number of |
|---|
| 912 | times, so that if a failure percolates up, the caller can assume the |
|---|
| 913 | failure isn't a short-term blip. |
|---|
| 914 | |
|---|
| 915 | Result fires when the upload succeeded, with a boolean indicating |
|---|
| 916 | whether the _complete_ share (i.e. all chunks, not just this one) has |
|---|
| 917 | been uploaded. |
|---|
| 918 | """ |
|---|
| 919 | with start_action( |
|---|
| 920 | action_type="allmydata:storage:http-client:immutable:write-share-chunk", |
|---|
| 921 | storage_index=si_to_human_readable(storage_index), |
|---|
| 922 | share_number=share_number, |
|---|
| 923 | offset=offset, |
|---|
| 924 | data_len=len(data), |
|---|
| 925 | ) as ctx: |
|---|
| 926 | result = await self._write_share_chunk( |
|---|
| 927 | storage_index, share_number, upload_secret, offset, data |
|---|
| 928 | ) |
|---|
| 929 | ctx.add_success_fields(finished=result.finished) |
|---|
| 930 | return result |
|---|
| 931 | |
|---|
| 932 | async def _write_share_chunk( |
|---|
| 933 | self, |
|---|
| 934 | storage_index: bytes, |
|---|
| 935 | share_number: int, |
|---|
| 936 | upload_secret: bytes, |
|---|
| 937 | offset: int, |
|---|
| 938 | data: bytes, |
|---|
| 939 | ) -> UploadProgress: |
|---|
| 940 | """Implementation of ``write_share_chunk()``.""" |
|---|
| 941 | url = self._client.relative_url( |
|---|
| 942 | "/storage/v1/immutable/{}/{}".format( |
|---|
| 943 | _encode_si(storage_index), share_number |
|---|
| 944 | ) |
|---|
| 945 | ) |
|---|
| 946 | response = await self._client.request( |
|---|
| 947 | "PATCH", |
|---|
| 948 | url, |
|---|
| 949 | upload_secret=upload_secret, |
|---|
| 950 | data=data, |
|---|
| 951 | headers=Headers( |
|---|
| 952 | { |
|---|
| 953 | "content-range": [ |
|---|
| 954 | ContentRange("bytes", offset, offset + len(data)).to_header() |
|---|
| 955 | ] |
|---|
| 956 | } |
|---|
| 957 | ), |
|---|
| 958 | ) |
|---|
| 959 | |
|---|
| 960 | if response.code == http.OK: |
|---|
| 961 | # Upload is still unfinished. |
|---|
| 962 | finished = False |
|---|
| 963 | elif response.code == http.CREATED: |
|---|
| 964 | # Upload is done! |
|---|
| 965 | finished = True |
|---|
| 966 | else: |
|---|
| 967 | raise ClientException( |
|---|
| 968 | response.code, |
|---|
| 969 | ) |
|---|
| 970 | body = cast( |
|---|
| 971 | Mapping[str, Sequence[Mapping[str, int]]], |
|---|
| 972 | await self._client.decode_cbor( |
|---|
| 973 | response, _SCHEMAS["immutable_write_share_chunk"] |
|---|
| 974 | ), |
|---|
| 975 | ) |
|---|
| 976 | remaining = RangeMap() |
|---|
| 977 | for chunk in body["required"]: |
|---|
| 978 | remaining.set(True, chunk["begin"], chunk["end"]) |
|---|
| 979 | return UploadProgress(finished=finished, required=remaining) |
|---|
| 980 | |
|---|
| 981 | @async_to_deferred |
|---|
| 982 | async def read_share_chunk( |
|---|
| 983 | self, storage_index: bytes, share_number: int, offset: int, length: int |
|---|
| 984 | ) -> bytes: |
|---|
| 985 | """ |
|---|
| 986 | Download a chunk of data from a share. |
|---|
| 987 | """ |
|---|
| 988 | with start_action( |
|---|
| 989 | action_type="allmydata:storage:http-client:immutable:read-share-chunk", |
|---|
| 990 | storage_index=si_to_human_readable(storage_index), |
|---|
| 991 | share_number=share_number, |
|---|
| 992 | offset=offset, |
|---|
| 993 | length=length, |
|---|
| 994 | ) as ctx: |
|---|
| 995 | result = await read_share_chunk( |
|---|
| 996 | self._client, "immutable", storage_index, share_number, offset, length |
|---|
| 997 | ) |
|---|
| 998 | ctx.add_success_fields(data_len=len(result)) |
|---|
| 999 | return result |
|---|
| 1000 | |
|---|
| 1001 | @async_to_deferred |
|---|
| 1002 | async def list_shares(self, storage_index: bytes) -> Set[int]: |
|---|
| 1003 | """ |
|---|
| 1004 | Return the set of shares for a given storage index. |
|---|
| 1005 | """ |
|---|
| 1006 | with start_action( |
|---|
| 1007 | action_type="allmydata:storage:http-client:immutable:list-shares", |
|---|
| 1008 | storage_index=si_to_human_readable(storage_index), |
|---|
| 1009 | ) as ctx: |
|---|
| 1010 | result = await self._list_shares(storage_index) |
|---|
| 1011 | ctx.add_success_fields(shares=result) |
|---|
| 1012 | return result |
|---|
| 1013 | |
|---|
| 1014 | async def _list_shares(self, storage_index: bytes) -> Set[int]: |
|---|
| 1015 | """Implementation of ``list_shares()``.""" |
|---|
| 1016 | url = self._client.relative_url( |
|---|
| 1017 | "/storage/v1/immutable/{}/shares".format(_encode_si(storage_index)) |
|---|
| 1018 | ) |
|---|
| 1019 | response = await self._client.request( |
|---|
| 1020 | "GET", |
|---|
| 1021 | url, |
|---|
| 1022 | ) |
|---|
| 1023 | if response.code == http.OK: |
|---|
| 1024 | return cast( |
|---|
| 1025 | Set[int], |
|---|
| 1026 | await self._client.decode_cbor(response, _SCHEMAS["list_shares"]), |
|---|
| 1027 | ) |
|---|
| 1028 | else: |
|---|
| 1029 | raise ClientException(response.code) |
|---|
| 1030 | |
|---|
| 1031 | @async_to_deferred |
|---|
| 1032 | async def advise_corrupt_share( |
|---|
| 1033 | self, |
|---|
| 1034 | storage_index: bytes, |
|---|
| 1035 | share_number: int, |
|---|
| 1036 | reason: str, |
|---|
| 1037 | ) -> None: |
|---|
| 1038 | """Indicate a share has been corrupted, with a human-readable message.""" |
|---|
| 1039 | with start_action( |
|---|
| 1040 | action_type="allmydata:storage:http-client:immutable:advise-corrupt-share", |
|---|
| 1041 | storage_index=si_to_human_readable(storage_index), |
|---|
| 1042 | share_number=share_number, |
|---|
| 1043 | reason=reason, |
|---|
| 1044 | ): |
|---|
| 1045 | await advise_corrupt_share( |
|---|
| 1046 | self._client, "immutable", storage_index, share_number, reason |
|---|
| 1047 | ) |
|---|
| 1048 | |
|---|
| 1049 | |
|---|
| 1050 | @frozen |
|---|
| 1051 | class WriteVector: |
|---|
| 1052 | """Data to write to a chunk.""" |
|---|
| 1053 | |
|---|
| 1054 | offset: int |
|---|
| 1055 | data: bytes |
|---|
| 1056 | |
|---|
| 1057 | |
|---|
| 1058 | @frozen |
|---|
| 1059 | class TestVector: |
|---|
| 1060 | """Checks to make on a chunk before writing to it.""" |
|---|
| 1061 | |
|---|
| 1062 | offset: int |
|---|
| 1063 | size: int |
|---|
| 1064 | specimen: bytes |
|---|
| 1065 | |
|---|
| 1066 | |
|---|
| 1067 | @frozen |
|---|
| 1068 | class ReadVector: |
|---|
| 1069 | """ |
|---|
| 1070 | Reads to do on chunks, as part of a read/test/write operation. |
|---|
| 1071 | """ |
|---|
| 1072 | |
|---|
| 1073 | offset: int |
|---|
| 1074 | size: int |
|---|
| 1075 | |
|---|
| 1076 | |
|---|
| 1077 | @frozen |
|---|
| 1078 | class TestWriteVectors: |
|---|
| 1079 | """Test and write vectors for a specific share.""" |
|---|
| 1080 | |
|---|
| 1081 | test_vectors: Sequence[TestVector] = field(factory=list) |
|---|
| 1082 | write_vectors: Sequence[WriteVector] = field(factory=list) |
|---|
| 1083 | new_length: Optional[int] = None |
|---|
| 1084 | |
|---|
| 1085 | def asdict(self) -> dict: |
|---|
| 1086 | """Return dictionary suitable for sending over CBOR.""" |
|---|
| 1087 | d = asdict(self) |
|---|
| 1088 | d["test"] = d.pop("test_vectors") |
|---|
| 1089 | d["write"] = d.pop("write_vectors") |
|---|
| 1090 | d["new-length"] = d.pop("new_length") |
|---|
| 1091 | return d |
|---|
| 1092 | |
|---|
| 1093 | |
|---|
| 1094 | @frozen |
|---|
| 1095 | class ReadTestWriteResult: |
|---|
| 1096 | """Result of sending read-test-write vectors.""" |
|---|
| 1097 | |
|---|
| 1098 | success: bool |
|---|
| 1099 | # Map share numbers to reads corresponding to the request's list of |
|---|
| 1100 | # ReadVectors: |
|---|
| 1101 | reads: Mapping[int, Sequence[bytes]] |
|---|
| 1102 | |
|---|
| 1103 | |
|---|
| 1104 | # Result type for mutable read/test/write HTTP response. Can't just use |
|---|
| 1105 | # dict[int,list[bytes]] because on Python 3.8 that will error out. |
|---|
| 1106 | MUTABLE_RTW = TypedDict( |
|---|
| 1107 | "MUTABLE_RTW", {"success": bool, "data": Mapping[int, Sequence[bytes]]} |
|---|
| 1108 | ) |
|---|
| 1109 | |
|---|
| 1110 | |
|---|
| 1111 | @frozen |
|---|
| 1112 | class StorageClientMutables: |
|---|
| 1113 | """ |
|---|
| 1114 | APIs for interacting with mutables. |
|---|
| 1115 | """ |
|---|
| 1116 | |
|---|
| 1117 | _client: StorageClient |
|---|
| 1118 | |
|---|
| 1119 | @async_to_deferred |
|---|
| 1120 | async def read_test_write_chunks( |
|---|
| 1121 | self, |
|---|
| 1122 | storage_index: bytes, |
|---|
| 1123 | write_enabler_secret: bytes, |
|---|
| 1124 | lease_renew_secret: bytes, |
|---|
| 1125 | lease_cancel_secret: bytes, |
|---|
| 1126 | testwrite_vectors: dict[int, TestWriteVectors], |
|---|
| 1127 | read_vector: list[ReadVector], |
|---|
| 1128 | ) -> ReadTestWriteResult: |
|---|
| 1129 | """ |
|---|
| 1130 | Read, test, and possibly write chunks to a particular mutable storage |
|---|
| 1131 | index. |
|---|
| 1132 | |
|---|
| 1133 | Reads are done before writes. |
|---|
| 1134 | |
|---|
| 1135 | Given a mapping between share numbers and test/write vectors, the tests |
|---|
| 1136 | are done and if they are valid the writes are done. |
|---|
| 1137 | """ |
|---|
| 1138 | with start_action( |
|---|
| 1139 | action_type="allmydata:storage:http-client:mutable:read-test-write", |
|---|
| 1140 | storage_index=si_to_human_readable(storage_index), |
|---|
| 1141 | ): |
|---|
| 1142 | return await self._read_test_write_chunks( |
|---|
| 1143 | storage_index, |
|---|
| 1144 | write_enabler_secret, |
|---|
| 1145 | lease_renew_secret, |
|---|
| 1146 | lease_cancel_secret, |
|---|
| 1147 | testwrite_vectors, |
|---|
| 1148 | read_vector, |
|---|
| 1149 | ) |
|---|
| 1150 | |
|---|
| 1151 | async def _read_test_write_chunks( |
|---|
| 1152 | self, |
|---|
| 1153 | storage_index: bytes, |
|---|
| 1154 | write_enabler_secret: bytes, |
|---|
| 1155 | lease_renew_secret: bytes, |
|---|
| 1156 | lease_cancel_secret: bytes, |
|---|
| 1157 | testwrite_vectors: dict[int, TestWriteVectors], |
|---|
| 1158 | read_vector: list[ReadVector], |
|---|
| 1159 | ) -> ReadTestWriteResult: |
|---|
| 1160 | """Implementation of ``read_test_write_chunks()``.""" |
|---|
| 1161 | url = self._client.relative_url( |
|---|
| 1162 | "/storage/v1/mutable/{}/read-test-write".format(_encode_si(storage_index)) |
|---|
| 1163 | ) |
|---|
| 1164 | message = { |
|---|
| 1165 | "test-write-vectors": { |
|---|
| 1166 | share_number: twv.asdict() |
|---|
| 1167 | for (share_number, twv) in testwrite_vectors.items() |
|---|
| 1168 | }, |
|---|
| 1169 | "read-vector": [asdict(r) for r in read_vector], |
|---|
| 1170 | } |
|---|
| 1171 | response = await self._client.request( |
|---|
| 1172 | "POST", |
|---|
| 1173 | url, |
|---|
| 1174 | write_enabler_secret=write_enabler_secret, |
|---|
| 1175 | lease_renew_secret=lease_renew_secret, |
|---|
| 1176 | lease_cancel_secret=lease_cancel_secret, |
|---|
| 1177 | message_to_serialize=message, |
|---|
| 1178 | ) |
|---|
| 1179 | if response.code == http.OK: |
|---|
| 1180 | result = cast( |
|---|
| 1181 | MUTABLE_RTW, |
|---|
| 1182 | await self._client.decode_cbor( |
|---|
| 1183 | response, _SCHEMAS["mutable_read_test_write"] |
|---|
| 1184 | ), |
|---|
| 1185 | ) |
|---|
| 1186 | return ReadTestWriteResult(success=result["success"], reads=result["data"]) |
|---|
| 1187 | else: |
|---|
| 1188 | raise ClientException(response.code, (await response.content())) |
|---|
| 1189 | |
|---|
| 1190 | @async_to_deferred |
|---|
| 1191 | async def read_share_chunk( |
|---|
| 1192 | self, |
|---|
| 1193 | storage_index: bytes, |
|---|
| 1194 | share_number: int, |
|---|
| 1195 | offset: int, |
|---|
| 1196 | length: int, |
|---|
| 1197 | ) -> bytes: |
|---|
| 1198 | """ |
|---|
| 1199 | Download a chunk of data from a share. |
|---|
| 1200 | """ |
|---|
| 1201 | with start_action( |
|---|
| 1202 | action_type="allmydata:storage:http-client:mutable:read-share-chunk", |
|---|
| 1203 | storage_index=si_to_human_readable(storage_index), |
|---|
| 1204 | share_number=share_number, |
|---|
| 1205 | offset=offset, |
|---|
| 1206 | length=length, |
|---|
| 1207 | ) as ctx: |
|---|
| 1208 | result = await read_share_chunk( |
|---|
| 1209 | self._client, "mutable", storage_index, share_number, offset, length |
|---|
| 1210 | ) |
|---|
| 1211 | ctx.add_success_fields(data_len=len(result)) |
|---|
| 1212 | return result |
|---|
| 1213 | |
|---|
| 1214 | @async_to_deferred |
|---|
| 1215 | async def list_shares(self, storage_index: bytes) -> Set[int]: |
|---|
| 1216 | """ |
|---|
| 1217 | List the share numbers for a given storage index. |
|---|
| 1218 | """ |
|---|
| 1219 | with start_action( |
|---|
| 1220 | action_type="allmydata:storage:http-client:mutable:list-shares", |
|---|
| 1221 | storage_index=si_to_human_readable(storage_index), |
|---|
| 1222 | ) as ctx: |
|---|
| 1223 | result = await self._list_shares(storage_index) |
|---|
| 1224 | ctx.add_success_fields(shares=result) |
|---|
| 1225 | return result |
|---|
| 1226 | |
|---|
| 1227 | async def _list_shares(self, storage_index: bytes) -> Set[int]: |
|---|
| 1228 | """Implementation of ``list_shares()``.""" |
|---|
| 1229 | url = self._client.relative_url( |
|---|
| 1230 | "/storage/v1/mutable/{}/shares".format(_encode_si(storage_index)) |
|---|
| 1231 | ) |
|---|
| 1232 | response = await self._client.request("GET", url) |
|---|
| 1233 | if response.code == http.OK: |
|---|
| 1234 | return cast( |
|---|
| 1235 | Set[int], |
|---|
| 1236 | await self._client.decode_cbor( |
|---|
| 1237 | response, |
|---|
| 1238 | _SCHEMAS["mutable_list_shares"], |
|---|
| 1239 | ), |
|---|
| 1240 | ) |
|---|
| 1241 | else: |
|---|
| 1242 | raise ClientException(response.code) |
|---|
| 1243 | |
|---|
| 1244 | @async_to_deferred |
|---|
| 1245 | async def advise_corrupt_share( |
|---|
| 1246 | self, |
|---|
| 1247 | storage_index: bytes, |
|---|
| 1248 | share_number: int, |
|---|
| 1249 | reason: str, |
|---|
| 1250 | ) -> None: |
|---|
| 1251 | """Indicate a share has been corrupted, with a human-readable message.""" |
|---|
| 1252 | with start_action( |
|---|
| 1253 | action_type="allmydata:storage:http-client:mutable:advise-corrupt-share", |
|---|
| 1254 | storage_index=si_to_human_readable(storage_index), |
|---|
| 1255 | share_number=share_number, |
|---|
| 1256 | reason=reason, |
|---|
| 1257 | ): |
|---|
| 1258 | await advise_corrupt_share( |
|---|
| 1259 | self._client, "mutable", storage_index, share_number, reason |
|---|
| 1260 | ) |
|---|