| 1 | """ |
|---|
| 2 | HTTP server for storage. |
|---|
| 3 | """ |
|---|
| 4 | |
|---|
| 5 | from __future__ import annotations |
|---|
| 6 | |
|---|
| 7 | from typing import ( |
|---|
| 8 | Any, |
|---|
| 9 | Callable, |
|---|
| 10 | Union, |
|---|
| 11 | cast, |
|---|
| 12 | Optional, |
|---|
| 13 | TypeVar, |
|---|
| 14 | Sequence, |
|---|
| 15 | Protocol, |
|---|
| 16 | Dict, |
|---|
| 17 | ) |
|---|
| 18 | from typing_extensions import ParamSpec, Concatenate |
|---|
| 19 | from functools import wraps |
|---|
| 20 | from base64 import b64decode |
|---|
| 21 | import binascii |
|---|
| 22 | from tempfile import TemporaryFile |
|---|
| 23 | from os import SEEK_END, SEEK_SET |
|---|
| 24 | import mmap |
|---|
| 25 | |
|---|
| 26 | from eliot import start_action |
|---|
| 27 | from cryptography.x509 import Certificate as CryptoCertificate |
|---|
| 28 | from zope.interface import implementer |
|---|
| 29 | from klein import Klein, KleinRenderable |
|---|
| 30 | from klein.resource import KleinResource |
|---|
| 31 | from twisted.web import http |
|---|
| 32 | from twisted.internet.interfaces import ( |
|---|
| 33 | IListeningPort, |
|---|
| 34 | IStreamServerEndpoint, |
|---|
| 35 | IPullProducer, |
|---|
| 36 | IProtocolFactory, |
|---|
| 37 | ) |
|---|
| 38 | from twisted.internet.address import IPv4Address, IPv6Address |
|---|
| 39 | from twisted.internet.defer import Deferred |
|---|
| 40 | from twisted.internet.ssl import CertificateOptions, Certificate, PrivateCertificate |
|---|
| 41 | from twisted.internet.interfaces import IReactorFromThreads |
|---|
| 42 | from twisted.web.server import Site, Request |
|---|
| 43 | from twisted.web.iweb import IRequest |
|---|
| 44 | from twisted.protocols.tls import TLSMemoryBIOFactory |
|---|
| 45 | from twisted.python.filepath import FilePath |
|---|
| 46 | from twisted.python.failure import Failure |
|---|
| 47 | |
|---|
| 48 | from attrs import define, field, Factory |
|---|
| 49 | from werkzeug.http import ( |
|---|
| 50 | parse_range_header, |
|---|
| 51 | parse_content_range_header, |
|---|
| 52 | parse_accept_header, |
|---|
| 53 | ) |
|---|
| 54 | from werkzeug.routing import BaseConverter, ValidationError |
|---|
| 55 | from werkzeug.datastructures import ContentRange |
|---|
| 56 | from hyperlink import DecodedURL |
|---|
| 57 | from cryptography.x509 import load_pem_x509_certificate |
|---|
| 58 | |
|---|
| 59 | |
|---|
| 60 | from pycddl import Schema, ValidationError as CDDLValidationError |
|---|
| 61 | from .server import StorageServer |
|---|
| 62 | from .http_common import ( |
|---|
| 63 | swissnum_auth_header, |
|---|
| 64 | Secrets, |
|---|
| 65 | get_content_type, |
|---|
| 66 | CBOR_MIME_TYPE, |
|---|
| 67 | get_spki_hash, |
|---|
| 68 | ) |
|---|
| 69 | |
|---|
| 70 | from .common import si_a2b |
|---|
| 71 | from .immutable import BucketWriter, ConflictingWriteError |
|---|
| 72 | from ..util.hashutil import timing_safe_compare |
|---|
| 73 | from ..util.base32 import rfc3548_alphabet |
|---|
| 74 | from ..util.deferredutil import async_to_deferred |
|---|
| 75 | from ..util.cputhreadpool import defer_to_thread |
|---|
| 76 | from ..util import cbor |
|---|
| 77 | from ..interfaces import BadWriteEnablerError |
|---|
| 78 | |
|---|
| 79 | |
|---|
| 80 | class ClientSecretsException(Exception): |
|---|
| 81 | """The client did not send the appropriate secrets.""" |
|---|
| 82 | |
|---|
| 83 | |
|---|
| 84 | def _extract_secrets( |
|---|
| 85 | header_values: Sequence[str], required_secrets: set[Secrets] |
|---|
| 86 | ) -> dict[Secrets, bytes]: |
|---|
| 87 | """ |
|---|
| 88 | Given list of values of ``X-Tahoe-Authorization`` headers, and required |
|---|
| 89 | secrets, return dictionary mapping secrets to decoded values. |
|---|
| 90 | |
|---|
| 91 | If too few secrets were given, or too many, a ``ClientSecretsException`` is |
|---|
| 92 | raised; its text is sent in the HTTP response. |
|---|
| 93 | """ |
|---|
| 94 | string_key_to_enum = {e.value: e for e in Secrets} |
|---|
| 95 | result = {} |
|---|
| 96 | try: |
|---|
| 97 | for header_value in header_values: |
|---|
| 98 | string_key, string_value = header_value.strip().split(" ", 1) |
|---|
| 99 | key = string_key_to_enum[string_key] |
|---|
| 100 | value = b64decode(string_value) |
|---|
| 101 | if value == b"": |
|---|
| 102 | raise ClientSecretsException( |
|---|
| 103 | "Failed to decode secret {}".format(string_key) |
|---|
| 104 | ) |
|---|
| 105 | if key in (Secrets.LEASE_CANCEL, Secrets.LEASE_RENEW) and len(value) != 32: |
|---|
| 106 | raise ClientSecretsException("Lease secrets must be 32 bytes long") |
|---|
| 107 | result[key] = value |
|---|
| 108 | except (ValueError, KeyError): |
|---|
| 109 | raise ClientSecretsException("Bad header value(s): {}".format(header_values)) |
|---|
| 110 | if result.keys() != required_secrets: |
|---|
| 111 | raise ClientSecretsException( |
|---|
| 112 | "Expected {} in X-Tahoe-Authorization headers, got {}".format( |
|---|
| 113 | [r.value for r in required_secrets], list(result.keys()) |
|---|
| 114 | ) |
|---|
| 115 | ) |
|---|
| 116 | return result |
|---|
| 117 | |
|---|
| 118 | |
|---|
| 119 | class BaseApp(Protocol): |
|---|
| 120 | """Protocol for ``HTTPServer`` and testing equivalent.""" |
|---|
| 121 | |
|---|
| 122 | _swissnum: bytes |
|---|
| 123 | |
|---|
| 124 | |
|---|
| 125 | P = ParamSpec("P") |
|---|
| 126 | T = TypeVar("T") |
|---|
| 127 | SecretsDict = Dict[Secrets, bytes] |
|---|
| 128 | App = TypeVar("App", bound=BaseApp) |
|---|
| 129 | |
|---|
| 130 | |
|---|
| 131 | def _authorization_decorator( |
|---|
| 132 | required_secrets: set[Secrets], |
|---|
| 133 | ) -> Callable[ |
|---|
| 134 | [Callable[Concatenate[App, Request, SecretsDict, P], T]], |
|---|
| 135 | Callable[Concatenate[App, Request, P], T], |
|---|
| 136 | ]: |
|---|
| 137 | """ |
|---|
| 138 | 1. Check the ``Authorization`` header matches server swissnum. |
|---|
| 139 | 2. Extract ``X-Tahoe-Authorization`` headers and pass them in. |
|---|
| 140 | 3. Log the request and response. |
|---|
| 141 | """ |
|---|
| 142 | |
|---|
| 143 | def decorator( |
|---|
| 144 | f: Callable[Concatenate[App, Request, SecretsDict, P], T] |
|---|
| 145 | ) -> Callable[Concatenate[App, Request, P], T]: |
|---|
| 146 | @wraps(f) |
|---|
| 147 | def route( |
|---|
| 148 | self: App, |
|---|
| 149 | request: Request, |
|---|
| 150 | *args: P.args, |
|---|
| 151 | **kwargs: P.kwargs, |
|---|
| 152 | ) -> T: |
|---|
| 153 | # Don't set text/html content type by default. |
|---|
| 154 | # None is actually supported, see https://github.com/twisted/twisted/issues/11902 |
|---|
| 155 | request.defaultContentType = None # type: ignore[assignment] |
|---|
| 156 | |
|---|
| 157 | with start_action( |
|---|
| 158 | action_type="allmydata:storage:http-server:handle-request", |
|---|
| 159 | method=request.method, |
|---|
| 160 | path=request.path, |
|---|
| 161 | ) as ctx: |
|---|
| 162 | try: |
|---|
| 163 | # Check Authorization header: |
|---|
| 164 | try: |
|---|
| 165 | auth_header = request.requestHeaders.getRawHeaders( |
|---|
| 166 | "Authorization", [""] |
|---|
| 167 | )[0].encode("utf-8") |
|---|
| 168 | except UnicodeError: |
|---|
| 169 | raise _HTTPError(http.BAD_REQUEST, "Bad Authorization header") |
|---|
| 170 | if not timing_safe_compare( |
|---|
| 171 | auth_header, |
|---|
| 172 | swissnum_auth_header(self._swissnum), |
|---|
| 173 | ): |
|---|
| 174 | raise _HTTPError( |
|---|
| 175 | http.UNAUTHORIZED, "Wrong Authorization header" |
|---|
| 176 | ) |
|---|
| 177 | |
|---|
| 178 | # Check secrets: |
|---|
| 179 | authorization = request.requestHeaders.getRawHeaders( |
|---|
| 180 | "X-Tahoe-Authorization", [] |
|---|
| 181 | ) |
|---|
| 182 | try: |
|---|
| 183 | secrets = _extract_secrets(authorization, required_secrets) |
|---|
| 184 | except ClientSecretsException as e: |
|---|
| 185 | raise _HTTPError(http.BAD_REQUEST, str(e)) |
|---|
| 186 | |
|---|
| 187 | # Run the business logic: |
|---|
| 188 | result = f(self, request, secrets, *args, **kwargs) |
|---|
| 189 | except _HTTPError as e: |
|---|
| 190 | # This isn't an error necessarily for logging purposes, |
|---|
| 191 | # it's an implementation detail, an easier way to set |
|---|
| 192 | # response codes. |
|---|
| 193 | ctx.add_success_fields(response_code=e.code) |
|---|
| 194 | ctx.finish() |
|---|
| 195 | raise |
|---|
| 196 | else: |
|---|
| 197 | ctx.add_success_fields(response_code=request.code) |
|---|
| 198 | return result |
|---|
| 199 | |
|---|
| 200 | return route |
|---|
| 201 | |
|---|
| 202 | return decorator |
|---|
| 203 | |
|---|
| 204 | |
|---|
| 205 | def _authorized_route( |
|---|
| 206 | klein_app: Klein, |
|---|
| 207 | required_secrets: set[Secrets], |
|---|
| 208 | url: str, |
|---|
| 209 | *route_args: Any, |
|---|
| 210 | branch: bool = False, |
|---|
| 211 | **route_kwargs: Any, |
|---|
| 212 | ) -> Callable[ |
|---|
| 213 | [ |
|---|
| 214 | Callable[ |
|---|
| 215 | Concatenate[App, Request, SecretsDict, P], |
|---|
| 216 | KleinRenderable, |
|---|
| 217 | ] |
|---|
| 218 | ], |
|---|
| 219 | Callable[..., KleinRenderable], |
|---|
| 220 | ]: |
|---|
| 221 | """ |
|---|
| 222 | Like Klein's @route, but with additional support for checking the |
|---|
| 223 | ``Authorization`` header as well as ``X-Tahoe-Authorization`` headers. The |
|---|
| 224 | latter will get passed in as second argument to wrapped functions, a |
|---|
| 225 | dictionary mapping a ``Secret`` value to the uploaded secret. |
|---|
| 226 | |
|---|
| 227 | :param required_secrets: Set of required ``Secret`` types. |
|---|
| 228 | """ |
|---|
| 229 | |
|---|
| 230 | def decorator( |
|---|
| 231 | f: Callable[ |
|---|
| 232 | Concatenate[App, Request, SecretsDict, P], |
|---|
| 233 | KleinRenderable, |
|---|
| 234 | ] |
|---|
| 235 | ) -> Callable[..., KleinRenderable]: |
|---|
| 236 | @klein_app.route(url, *route_args, branch=branch, **route_kwargs) # type: ignore[arg-type] |
|---|
| 237 | @_authorization_decorator(required_secrets) |
|---|
| 238 | @wraps(f) |
|---|
| 239 | def handle_route( |
|---|
| 240 | app: App, |
|---|
| 241 | request: Request, |
|---|
| 242 | secrets: SecretsDict, |
|---|
| 243 | *args: P.args, |
|---|
| 244 | **kwargs: P.kwargs, |
|---|
| 245 | ) -> KleinRenderable: |
|---|
| 246 | return f(app, request, secrets, *args, **kwargs) |
|---|
| 247 | |
|---|
| 248 | return handle_route |
|---|
| 249 | |
|---|
| 250 | return decorator |
|---|
| 251 | |
|---|
| 252 | |
|---|
| 253 | @define |
|---|
| 254 | class StorageIndexUploads: |
|---|
| 255 | """ |
|---|
| 256 | In-progress upload to storage index. |
|---|
| 257 | """ |
|---|
| 258 | |
|---|
| 259 | # Map share number to BucketWriter |
|---|
| 260 | shares: dict[int, BucketWriter] = Factory(dict) |
|---|
| 261 | |
|---|
| 262 | # Map share number to the upload secret (different shares might have |
|---|
| 263 | # different upload secrets). |
|---|
| 264 | upload_secrets: dict[int, bytes] = Factory(dict) |
|---|
| 265 | |
|---|
| 266 | |
|---|
| 267 | @define |
|---|
| 268 | class UploadsInProgress: |
|---|
| 269 | """ |
|---|
| 270 | Keep track of uploads for storage indexes. |
|---|
| 271 | """ |
|---|
| 272 | |
|---|
| 273 | # Map storage index to corresponding uploads-in-progress |
|---|
| 274 | _uploads: dict[bytes, StorageIndexUploads] = Factory(dict) |
|---|
| 275 | |
|---|
| 276 | # Map BucketWriter to (storage index, share number) |
|---|
| 277 | _bucketwriters: dict[BucketWriter, tuple[bytes, int]] = Factory(dict) |
|---|
| 278 | |
|---|
| 279 | def add_write_bucket( |
|---|
| 280 | self, |
|---|
| 281 | storage_index: bytes, |
|---|
| 282 | share_number: int, |
|---|
| 283 | upload_secret: bytes, |
|---|
| 284 | bucket: BucketWriter, |
|---|
| 285 | ): |
|---|
| 286 | """Add a new ``BucketWriter`` to be tracked.""" |
|---|
| 287 | si_uploads = self._uploads.setdefault(storage_index, StorageIndexUploads()) |
|---|
| 288 | si_uploads.shares[share_number] = bucket |
|---|
| 289 | si_uploads.upload_secrets[share_number] = upload_secret |
|---|
| 290 | self._bucketwriters[bucket] = (storage_index, share_number) |
|---|
| 291 | |
|---|
| 292 | def get_write_bucket( |
|---|
| 293 | self, storage_index: bytes, share_number: int, upload_secret: bytes |
|---|
| 294 | ) -> BucketWriter: |
|---|
| 295 | """Get the given in-progress immutable share upload.""" |
|---|
| 296 | self.validate_upload_secret(storage_index, share_number, upload_secret) |
|---|
| 297 | try: |
|---|
| 298 | return self._uploads[storage_index].shares[share_number] |
|---|
| 299 | except (KeyError, IndexError): |
|---|
| 300 | raise _HTTPError(http.NOT_FOUND) |
|---|
| 301 | |
|---|
| 302 | def remove_write_bucket(self, bucket: BucketWriter) -> None: |
|---|
| 303 | """Stop tracking the given ``BucketWriter``.""" |
|---|
| 304 | try: |
|---|
| 305 | storage_index, share_number = self._bucketwriters.pop(bucket) |
|---|
| 306 | except KeyError: |
|---|
| 307 | # This is probably a BucketWriter created by Foolscap, so just |
|---|
| 308 | # ignore it. |
|---|
| 309 | return |
|---|
| 310 | uploads_index = self._uploads[storage_index] |
|---|
| 311 | uploads_index.shares.pop(share_number) |
|---|
| 312 | uploads_index.upload_secrets.pop(share_number) |
|---|
| 313 | if not uploads_index.shares: |
|---|
| 314 | self._uploads.pop(storage_index) |
|---|
| 315 | |
|---|
| 316 | def validate_upload_secret( |
|---|
| 317 | self, storage_index: bytes, share_number: int, upload_secret: bytes |
|---|
| 318 | ) -> None: |
|---|
| 319 | """ |
|---|
| 320 | Raise an unauthorized-HTTP-response exception if the given |
|---|
| 321 | storage_index+share_number have a different upload secret than the |
|---|
| 322 | given one. |
|---|
| 323 | |
|---|
| 324 | If the given upload doesn't exist at all, nothing happens. |
|---|
| 325 | """ |
|---|
| 326 | if storage_index in self._uploads: |
|---|
| 327 | in_progress = self._uploads[storage_index] |
|---|
| 328 | # For pre-existing upload, make sure password matches. |
|---|
| 329 | if share_number in in_progress.upload_secrets and not timing_safe_compare( |
|---|
| 330 | in_progress.upload_secrets[share_number], upload_secret |
|---|
| 331 | ): |
|---|
| 332 | raise _HTTPError(http.UNAUTHORIZED) |
|---|
| 333 | |
|---|
| 334 | |
|---|
| 335 | class StorageIndexConverter(BaseConverter): |
|---|
| 336 | """Parser/validator for storage index URL path segments.""" |
|---|
| 337 | |
|---|
| 338 | regex = "[" + str(rfc3548_alphabet, "ascii") + "]{26}" |
|---|
| 339 | |
|---|
| 340 | def to_python(self, value: str) -> bytes: |
|---|
| 341 | try: |
|---|
| 342 | return si_a2b(value.encode("ascii")) |
|---|
| 343 | except (AssertionError, binascii.Error, ValueError): |
|---|
| 344 | raise ValidationError("Invalid storage index") |
|---|
| 345 | |
|---|
| 346 | |
|---|
| 347 | class _HTTPError(Exception): |
|---|
| 348 | """ |
|---|
| 349 | Raise from ``HTTPServer`` endpoint to return the given HTTP response code. |
|---|
| 350 | """ |
|---|
| 351 | |
|---|
| 352 | def __init__(self, code: int, body: Optional[str] = None): |
|---|
| 353 | Exception.__init__(self, (code, body)) |
|---|
| 354 | self.code = code |
|---|
| 355 | self.body = body |
|---|
| 356 | |
|---|
| 357 | |
|---|
| 358 | # CDDL schemas. |
|---|
| 359 | # |
|---|
| 360 | # Tags are of the form #6.nnn, where the number is documented at |
|---|
| 361 | # https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml. Notably, #6.258 |
|---|
| 362 | # indicates a set. |
|---|
| 363 | # |
|---|
| 364 | # Somewhat arbitrary limits are set to reduce e.g. number of shares, number of |
|---|
| 365 | # vectors, etc.. These may need to be iterated on in future revisions of the |
|---|
| 366 | # code. |
|---|
| 367 | _SCHEMAS = { |
|---|
| 368 | "allocate_buckets": Schema( |
|---|
| 369 | """ |
|---|
| 370 | request = { |
|---|
| 371 | share-numbers: #6.258([0*256 uint]) |
|---|
| 372 | allocated-size: uint |
|---|
| 373 | } |
|---|
| 374 | """ |
|---|
| 375 | ), |
|---|
| 376 | "advise_corrupt_share": Schema( |
|---|
| 377 | """ |
|---|
| 378 | request = { |
|---|
| 379 | reason: tstr .size (1..32765) |
|---|
| 380 | } |
|---|
| 381 | """ |
|---|
| 382 | ), |
|---|
| 383 | "mutable_read_test_write": Schema( |
|---|
| 384 | """ |
|---|
| 385 | request = { |
|---|
| 386 | "test-write-vectors": { |
|---|
| 387 | 0*256 share_number : { |
|---|
| 388 | "test": [0*30 {"offset": uint, "size": uint, "specimen": bstr}] |
|---|
| 389 | "write": [* {"offset": uint, "data": bstr}] |
|---|
| 390 | "new-length": uint / null |
|---|
| 391 | } |
|---|
| 392 | } |
|---|
| 393 | "read-vector": [0*30 {"offset": uint, "size": uint}] |
|---|
| 394 | } |
|---|
| 395 | share_number = uint |
|---|
| 396 | """ |
|---|
| 397 | ), |
|---|
| 398 | } |
|---|
| 399 | |
|---|
| 400 | |
|---|
| 401 | # Callable that takes offset and length, returns the data at that range. |
|---|
| 402 | ReadData = Callable[[int, int], bytes] |
|---|
| 403 | |
|---|
| 404 | |
|---|
| 405 | @implementer(IPullProducer) |
|---|
| 406 | @define |
|---|
| 407 | class _ReadAllProducer: |
|---|
| 408 | """ |
|---|
| 409 | Producer that calls a read function repeatedly to read all the data, and |
|---|
| 410 | writes to a request. |
|---|
| 411 | """ |
|---|
| 412 | |
|---|
| 413 | request: Request |
|---|
| 414 | read_data: ReadData |
|---|
| 415 | result: Deferred = Factory(Deferred) |
|---|
| 416 | start: int = field(default=0) |
|---|
| 417 | |
|---|
| 418 | @classmethod |
|---|
| 419 | def produce_to(cls, request: Request, read_data: ReadData) -> Deferred[bytes]: |
|---|
| 420 | """ |
|---|
| 421 | Create and register the producer, returning ``Deferred`` that should be |
|---|
| 422 | returned from a HTTP server endpoint. |
|---|
| 423 | """ |
|---|
| 424 | producer = cls(request, read_data) |
|---|
| 425 | request.registerProducer(producer, False) |
|---|
| 426 | return producer.result |
|---|
| 427 | |
|---|
| 428 | def resumeProducing(self) -> None: |
|---|
| 429 | data = self.read_data(self.start, 65536) |
|---|
| 430 | if not data: |
|---|
| 431 | self.request.unregisterProducer() |
|---|
| 432 | d = self.result |
|---|
| 433 | del self.result |
|---|
| 434 | d.callback(b"") |
|---|
| 435 | return |
|---|
| 436 | self.request.write(data) |
|---|
| 437 | self.start += len(data) |
|---|
| 438 | |
|---|
| 439 | def pauseProducing(self) -> None: |
|---|
| 440 | pass |
|---|
| 441 | |
|---|
| 442 | def stopProducing(self) -> None: |
|---|
| 443 | pass |
|---|
| 444 | |
|---|
| 445 | |
|---|
| 446 | @implementer(IPullProducer) |
|---|
| 447 | @define |
|---|
| 448 | class _ReadRangeProducer: |
|---|
| 449 | """ |
|---|
| 450 | Producer that calls a read function to read a range of data, and writes to |
|---|
| 451 | a request. |
|---|
| 452 | """ |
|---|
| 453 | |
|---|
| 454 | request: Optional[Request] |
|---|
| 455 | read_data: ReadData |
|---|
| 456 | result: Optional[Deferred[bytes]] |
|---|
| 457 | start: int |
|---|
| 458 | remaining: int |
|---|
| 459 | |
|---|
| 460 | def resumeProducing(self) -> None: |
|---|
| 461 | if self.result is None or self.request is None: |
|---|
| 462 | return |
|---|
| 463 | |
|---|
| 464 | to_read = min(self.remaining, 65536) |
|---|
| 465 | data = self.read_data(self.start, to_read) |
|---|
| 466 | assert len(data) <= to_read |
|---|
| 467 | |
|---|
| 468 | if not data and self.remaining > 0: |
|---|
| 469 | d, self.result = self.result, None |
|---|
| 470 | d.errback( |
|---|
| 471 | ValueError( |
|---|
| 472 | f"Should be {self.remaining} bytes left, but we got an empty read" |
|---|
| 473 | ) |
|---|
| 474 | ) |
|---|
| 475 | self.stopProducing() |
|---|
| 476 | return |
|---|
| 477 | |
|---|
| 478 | if len(data) > self.remaining: |
|---|
| 479 | d, self.result = self.result, None |
|---|
| 480 | d.errback( |
|---|
| 481 | ValueError( |
|---|
| 482 | f"Should be {self.remaining} bytes left, but we got more than that ({len(data)})!" |
|---|
| 483 | ) |
|---|
| 484 | ) |
|---|
| 485 | self.stopProducing() |
|---|
| 486 | return |
|---|
| 487 | |
|---|
| 488 | self.start += len(data) |
|---|
| 489 | self.remaining -= len(data) |
|---|
| 490 | assert self.remaining >= 0 |
|---|
| 491 | |
|---|
| 492 | self.request.write(data) |
|---|
| 493 | |
|---|
| 494 | if self.remaining == 0: |
|---|
| 495 | self.stopProducing() |
|---|
| 496 | |
|---|
| 497 | def pauseProducing(self) -> None: |
|---|
| 498 | pass |
|---|
| 499 | |
|---|
| 500 | def stopProducing(self) -> None: |
|---|
| 501 | if self.request is not None: |
|---|
| 502 | self.request.unregisterProducer() |
|---|
| 503 | self.request = None |
|---|
| 504 | if self.result is not None: |
|---|
| 505 | d = self.result |
|---|
| 506 | self.result = None |
|---|
| 507 | d.callback(b"") |
|---|
| 508 | |
|---|
| 509 | |
|---|
| 510 | def read_range( |
|---|
| 511 | request: Request, read_data: ReadData, share_length: int |
|---|
| 512 | ) -> Union[Deferred[bytes], bytes]: |
|---|
| 513 | """ |
|---|
| 514 | Read an optional ``Range`` header, reads data appropriately via the given |
|---|
| 515 | callable, writes the data to the request. |
|---|
| 516 | |
|---|
| 517 | Only parses a subset of ``Range`` headers that we support: must be set, |
|---|
| 518 | bytes only, only a single range, the end must be explicitly specified. |
|---|
| 519 | Raises a ``_HTTPError(http.REQUESTED_RANGE_NOT_SATISFIABLE)`` if parsing is |
|---|
| 520 | not possible or the header isn't set. |
|---|
| 521 | |
|---|
| 522 | Takes a function that will do the actual reading given the start offset and |
|---|
| 523 | a length to read. |
|---|
| 524 | |
|---|
| 525 | The resulting data is written to the request. |
|---|
| 526 | """ |
|---|
| 527 | |
|---|
| 528 | def read_data_with_error_handling(offset: int, length: int) -> bytes: |
|---|
| 529 | try: |
|---|
| 530 | return read_data(offset, length) |
|---|
| 531 | except _HTTPError as e: |
|---|
| 532 | request.setResponseCode(e.code) |
|---|
| 533 | # Empty read means we're done. |
|---|
| 534 | return b"" |
|---|
| 535 | |
|---|
| 536 | if request.getHeader("range") is None: |
|---|
| 537 | return _ReadAllProducer.produce_to(request, read_data_with_error_handling) |
|---|
| 538 | |
|---|
| 539 | range_header = parse_range_header(request.getHeader("range")) |
|---|
| 540 | if ( |
|---|
| 541 | range_header is None # failed to parse |
|---|
| 542 | or range_header.units != "bytes" |
|---|
| 543 | or len(range_header.ranges) > 1 # more than one range |
|---|
| 544 | or range_header.ranges[0][1] is None # range without end |
|---|
| 545 | ): |
|---|
| 546 | raise _HTTPError(http.REQUESTED_RANGE_NOT_SATISFIABLE) |
|---|
| 547 | |
|---|
| 548 | offset, end = range_header.ranges[0] |
|---|
| 549 | assert end is not None # should've exited in block above this if so |
|---|
| 550 | |
|---|
| 551 | # If we're being ask to read beyond the length of the share, just read |
|---|
| 552 | # less: |
|---|
| 553 | end = min(end, share_length) |
|---|
| 554 | if offset >= end: |
|---|
| 555 | # Basically we'd need to return an empty body. However, the |
|---|
| 556 | # Content-Range header can't actually represent empty lengths... so |
|---|
| 557 | # (mis)use 204 response code to indicate that. |
|---|
| 558 | raise _HTTPError(http.NO_CONTENT) |
|---|
| 559 | |
|---|
| 560 | request.setResponseCode(http.PARTIAL_CONTENT) |
|---|
| 561 | |
|---|
| 562 | # Actual conversion from Python's exclusive ranges to inclusive ranges is |
|---|
| 563 | # handled by werkzeug. |
|---|
| 564 | request.setHeader( |
|---|
| 565 | "content-range", |
|---|
| 566 | ContentRange("bytes", offset, end).to_header(), |
|---|
| 567 | ) |
|---|
| 568 | |
|---|
| 569 | d: Deferred[bytes] = Deferred() |
|---|
| 570 | request.registerProducer( |
|---|
| 571 | _ReadRangeProducer( |
|---|
| 572 | request, read_data_with_error_handling, d, offset, end - offset |
|---|
| 573 | ), |
|---|
| 574 | False, |
|---|
| 575 | ) |
|---|
| 576 | return d |
|---|
| 577 | |
|---|
| 578 | |
|---|
| 579 | def _add_error_handling(app: Klein) -> None: |
|---|
| 580 | """Add exception handlers to a Klein app.""" |
|---|
| 581 | |
|---|
| 582 | @app.handle_errors(_HTTPError) |
|---|
| 583 | def _http_error(self: Any, request: IRequest, failure: Failure) -> KleinRenderable: |
|---|
| 584 | """Handle ``_HTTPError`` exceptions.""" |
|---|
| 585 | assert isinstance(failure.value, _HTTPError) |
|---|
| 586 | request.setResponseCode(failure.value.code) |
|---|
| 587 | if failure.value.body is not None: |
|---|
| 588 | return failure.value.body |
|---|
| 589 | else: |
|---|
| 590 | return b"" |
|---|
| 591 | |
|---|
| 592 | @app.handle_errors(CDDLValidationError) |
|---|
| 593 | def _cddl_validation_error( |
|---|
| 594 | self: Any, request: IRequest, failure: Failure |
|---|
| 595 | ) -> KleinRenderable: |
|---|
| 596 | """Handle CDDL validation errors.""" |
|---|
| 597 | request.setResponseCode(http.BAD_REQUEST) |
|---|
| 598 | return str(failure.value).encode("utf-8") |
|---|
| 599 | |
|---|
| 600 | |
|---|
| 601 | async def read_encoded( |
|---|
| 602 | reactor, request, schema: Schema, max_size: int = 1024 * 1024 |
|---|
| 603 | ) -> Any: |
|---|
| 604 | """ |
|---|
| 605 | Read encoded request body data, decoding it with CBOR by default. |
|---|
| 606 | |
|---|
| 607 | Somewhat arbitrarily, limit body size to 1MiB by default. |
|---|
| 608 | """ |
|---|
| 609 | content_type = get_content_type(request.requestHeaders) |
|---|
| 610 | if content_type is None: |
|---|
| 611 | content_type = CBOR_MIME_TYPE |
|---|
| 612 | if content_type != CBOR_MIME_TYPE: |
|---|
| 613 | raise _HTTPError(http.UNSUPPORTED_MEDIA_TYPE) |
|---|
| 614 | |
|---|
| 615 | # Make sure it's not too large: |
|---|
| 616 | request.content.seek(0, SEEK_END) |
|---|
| 617 | size = request.content.tell() |
|---|
| 618 | if size > max_size: |
|---|
| 619 | raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE) |
|---|
| 620 | request.content.seek(0, SEEK_SET) |
|---|
| 621 | |
|---|
| 622 | # We don't want to load the whole message into memory, cause it might |
|---|
| 623 | # be quite large. The CDDL validator takes a read-only bytes-like |
|---|
| 624 | # thing. Luckily, for large request bodies twisted.web will buffer the |
|---|
| 625 | # data in a file, so we can use mmap() to get a memory view. The CDDL |
|---|
| 626 | # validator will not make a copy, so it won't increase memory usage |
|---|
| 627 | # beyond that. |
|---|
| 628 | try: |
|---|
| 629 | fd = request.content.fileno() |
|---|
| 630 | except (ValueError, OSError): |
|---|
| 631 | fd = -1 |
|---|
| 632 | if fd >= 0: |
|---|
| 633 | # It's a file, so we can use mmap() to save memory. |
|---|
| 634 | message = mmap.mmap(fd, 0, access=mmap.ACCESS_READ) |
|---|
| 635 | else: |
|---|
| 636 | message = request.content.read() |
|---|
| 637 | |
|---|
| 638 | # Pycddl will release the GIL when validating larger documents, so |
|---|
| 639 | # let's take advantage of multiple CPUs: |
|---|
| 640 | decoded = await defer_to_thread(schema.validate_cbor, message, True) |
|---|
| 641 | return decoded |
|---|
| 642 | |
|---|
| 643 | class HTTPServer(BaseApp): |
|---|
| 644 | """ |
|---|
| 645 | A HTTP interface to the storage server. |
|---|
| 646 | """ |
|---|
| 647 | |
|---|
| 648 | _app = Klein() |
|---|
| 649 | _app.url_map.converters["storage_index"] = StorageIndexConverter |
|---|
| 650 | _add_error_handling(_app) |
|---|
| 651 | |
|---|
| 652 | def __init__( |
|---|
| 653 | self, |
|---|
| 654 | reactor: IReactorFromThreads, |
|---|
| 655 | storage_server: StorageServer, |
|---|
| 656 | swissnum: bytes, |
|---|
| 657 | ): |
|---|
| 658 | self._reactor = reactor |
|---|
| 659 | self._storage_server = storage_server |
|---|
| 660 | self._swissnum = swissnum |
|---|
| 661 | # Maps storage index to StorageIndexUploads: |
|---|
| 662 | self._uploads = UploadsInProgress() |
|---|
| 663 | |
|---|
| 664 | # When an upload finishes successfully, gets aborted, or times out, |
|---|
| 665 | # make sure it gets removed from our tracking datastructure: |
|---|
| 666 | self._storage_server.register_bucket_writer_close_handler( |
|---|
| 667 | self._uploads.remove_write_bucket |
|---|
| 668 | ) |
|---|
| 669 | |
|---|
| 670 | def get_resource(self) -> KleinResource: |
|---|
| 671 | """Return twisted.web ``Resource`` for this object.""" |
|---|
| 672 | return self._app.resource() |
|---|
| 673 | |
|---|
| 674 | def _send_encoded(self, request: Request, data: object) -> Deferred[bytes]: |
|---|
| 675 | """ |
|---|
| 676 | Return encoded data suitable for writing as the HTTP body response, by |
|---|
| 677 | default using CBOR. |
|---|
| 678 | |
|---|
| 679 | Also sets the appropriate ``Content-Type`` header on the response. |
|---|
| 680 | """ |
|---|
| 681 | accept_headers = request.requestHeaders.getRawHeaders("accept") or [ |
|---|
| 682 | CBOR_MIME_TYPE |
|---|
| 683 | ] |
|---|
| 684 | accept = parse_accept_header(accept_headers[0]) |
|---|
| 685 | if accept.best == CBOR_MIME_TYPE: |
|---|
| 686 | request.setHeader("Content-Type", CBOR_MIME_TYPE) |
|---|
| 687 | f = TemporaryFile() |
|---|
| 688 | cbor.dump(data, f) # type: ignore |
|---|
| 689 | |
|---|
| 690 | def read_data(offset: int, length: int) -> bytes: |
|---|
| 691 | f.seek(offset) |
|---|
| 692 | return f.read(length) |
|---|
| 693 | |
|---|
| 694 | return _ReadAllProducer.produce_to(request, read_data) |
|---|
| 695 | else: |
|---|
| 696 | # TODO Might want to optionally send JSON someday: |
|---|
| 697 | # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861 |
|---|
| 698 | raise _HTTPError(http.NOT_ACCEPTABLE) |
|---|
| 699 | |
|---|
| 700 | ##### Generic APIs ##### |
|---|
| 701 | |
|---|
| 702 | @_authorized_route(_app, set(), "/storage/v1/version", methods=["GET"]) |
|---|
| 703 | def version(self, request: Request, authorization: SecretsDict) -> KleinRenderable: |
|---|
| 704 | """Return version information.""" |
|---|
| 705 | return self._send_encoded(request, self._get_version()) |
|---|
| 706 | |
|---|
| 707 | def _get_version(self) -> dict[bytes, Any]: |
|---|
| 708 | """ |
|---|
| 709 | Get the HTTP version of the storage server's version response. |
|---|
| 710 | |
|---|
| 711 | This differs from the Foolscap version by omitting certain obsolete |
|---|
| 712 | fields. |
|---|
| 713 | """ |
|---|
| 714 | v = self._storage_server.get_version() |
|---|
| 715 | v1_identifier = b"http://allmydata.org/tahoe/protocols/storage/v1" |
|---|
| 716 | v1 = v[v1_identifier] |
|---|
| 717 | return { |
|---|
| 718 | v1_identifier: { |
|---|
| 719 | b"maximum-immutable-share-size": v1[b"maximum-immutable-share-size"], |
|---|
| 720 | b"maximum-mutable-share-size": v1[b"maximum-mutable-share-size"], |
|---|
| 721 | b"available-space": v1[b"available-space"], |
|---|
| 722 | }, |
|---|
| 723 | b"application-version": v[b"application-version"], |
|---|
| 724 | } |
|---|
| 725 | |
|---|
| 726 | ##### Immutable APIs ##### |
|---|
| 727 | |
|---|
| 728 | @_authorized_route( |
|---|
| 729 | _app, |
|---|
| 730 | {Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL, Secrets.UPLOAD}, |
|---|
| 731 | "/storage/v1/immutable/<storage_index:storage_index>", |
|---|
| 732 | methods=["POST"], |
|---|
| 733 | ) |
|---|
| 734 | @async_to_deferred |
|---|
| 735 | async def allocate_buckets( |
|---|
| 736 | self, request: Request, authorization: SecretsDict, storage_index: bytes |
|---|
| 737 | ) -> KleinRenderable: |
|---|
| 738 | """Allocate buckets.""" |
|---|
| 739 | upload_secret = authorization[Secrets.UPLOAD] |
|---|
| 740 | # It's just a list of up to ~256 shares, shouldn't use many bytes. |
|---|
| 741 | info = await read_encoded( |
|---|
| 742 | self._reactor, request, _SCHEMAS["allocate_buckets"], max_size=8192 |
|---|
| 743 | ) |
|---|
| 744 | |
|---|
| 745 | # We do NOT validate the upload secret for existing bucket uploads. |
|---|
| 746 | # Another upload may be happening in parallel, with a different upload |
|---|
| 747 | # key. That's fine! If a client tries to _write_ to that upload, they |
|---|
| 748 | # need to have an upload key. That does mean we leak the existence of |
|---|
| 749 | # these parallel uploads, but if you know storage index you can |
|---|
| 750 | # download them once upload finishes, so it's not a big deal to leak |
|---|
| 751 | # that information. |
|---|
| 752 | |
|---|
| 753 | already_got, sharenum_to_bucket = self._storage_server.allocate_buckets( |
|---|
| 754 | storage_index, |
|---|
| 755 | renew_secret=authorization[Secrets.LEASE_RENEW], |
|---|
| 756 | cancel_secret=authorization[Secrets.LEASE_CANCEL], |
|---|
| 757 | sharenums=info["share-numbers"], |
|---|
| 758 | allocated_size=info["allocated-size"], |
|---|
| 759 | ) |
|---|
| 760 | for share_number, bucket in sharenum_to_bucket.items(): |
|---|
| 761 | self._uploads.add_write_bucket( |
|---|
| 762 | storage_index, share_number, upload_secret, bucket |
|---|
| 763 | ) |
|---|
| 764 | |
|---|
| 765 | return await self._send_encoded( |
|---|
| 766 | request, |
|---|
| 767 | {"already-have": set(already_got), "allocated": set(sharenum_to_bucket)}, |
|---|
| 768 | ) |
|---|
| 769 | |
|---|
| 770 | @_authorized_route( |
|---|
| 771 | _app, |
|---|
| 772 | {Secrets.UPLOAD}, |
|---|
| 773 | "/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/abort", |
|---|
| 774 | methods=["PUT"], |
|---|
| 775 | ) |
|---|
| 776 | def abort_share_upload( |
|---|
| 777 | self, |
|---|
| 778 | request: Request, |
|---|
| 779 | authorization: SecretsDict, |
|---|
| 780 | storage_index: bytes, |
|---|
| 781 | share_number: int, |
|---|
| 782 | ) -> KleinRenderable: |
|---|
| 783 | """Abort an in-progress immutable share upload.""" |
|---|
| 784 | try: |
|---|
| 785 | bucket = self._uploads.get_write_bucket( |
|---|
| 786 | storage_index, share_number, authorization[Secrets.UPLOAD] |
|---|
| 787 | ) |
|---|
| 788 | except _HTTPError as e: |
|---|
| 789 | if e.code == http.NOT_FOUND: |
|---|
| 790 | # It may be we've already uploaded this, in which case error |
|---|
| 791 | # should be method not allowed (405). |
|---|
| 792 | try: |
|---|
| 793 | self._storage_server.get_buckets(storage_index)[share_number] |
|---|
| 794 | except KeyError: |
|---|
| 795 | pass |
|---|
| 796 | else: |
|---|
| 797 | # Already uploaded, so we can't abort. |
|---|
| 798 | raise _HTTPError(http.NOT_ALLOWED) |
|---|
| 799 | raise |
|---|
| 800 | |
|---|
| 801 | # Abort the upload; this should close it which will eventually result |
|---|
| 802 | # in self._uploads.remove_write_bucket() being called. |
|---|
| 803 | bucket.abort() |
|---|
| 804 | |
|---|
| 805 | return b"" |
|---|
| 806 | |
|---|
| 807 | @_authorized_route( |
|---|
| 808 | _app, |
|---|
| 809 | {Secrets.UPLOAD}, |
|---|
| 810 | "/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>", |
|---|
| 811 | methods=["PATCH"], |
|---|
| 812 | ) |
|---|
| 813 | def write_share_data( |
|---|
| 814 | self, |
|---|
| 815 | request: Request, |
|---|
| 816 | authorization: SecretsDict, |
|---|
| 817 | storage_index: bytes, |
|---|
| 818 | share_number: int, |
|---|
| 819 | ) -> KleinRenderable: |
|---|
| 820 | """Write data to an in-progress immutable upload.""" |
|---|
| 821 | content_range = parse_content_range_header(request.getHeader("content-range")) |
|---|
| 822 | if content_range is None or content_range.units != "bytes": |
|---|
| 823 | request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE) |
|---|
| 824 | return b"" |
|---|
| 825 | |
|---|
| 826 | bucket = self._uploads.get_write_bucket( |
|---|
| 827 | storage_index, share_number, authorization[Secrets.UPLOAD] |
|---|
| 828 | ) |
|---|
| 829 | offset = content_range.start or 0 |
|---|
| 830 | # We don't support an unspecified stop for the range: |
|---|
| 831 | assert content_range.stop is not None |
|---|
| 832 | # Missing body makes no sense: |
|---|
| 833 | assert request.content is not None |
|---|
| 834 | remaining = content_range.stop - offset |
|---|
| 835 | finished = False |
|---|
| 836 | |
|---|
| 837 | while remaining > 0: |
|---|
| 838 | data = request.content.read(min(remaining, 65536)) |
|---|
| 839 | assert data, "uploaded data length doesn't match range" |
|---|
| 840 | try: |
|---|
| 841 | finished = bucket.write(offset, data) |
|---|
| 842 | except ConflictingWriteError: |
|---|
| 843 | request.setResponseCode(http.CONFLICT) |
|---|
| 844 | return b"" |
|---|
| 845 | remaining -= len(data) |
|---|
| 846 | offset += len(data) |
|---|
| 847 | |
|---|
| 848 | if finished: |
|---|
| 849 | bucket.close() |
|---|
| 850 | request.setResponseCode(http.CREATED) |
|---|
| 851 | else: |
|---|
| 852 | request.setResponseCode(http.OK) |
|---|
| 853 | |
|---|
| 854 | required = [] |
|---|
| 855 | for start, end, _ in bucket.required_ranges().ranges(): |
|---|
| 856 | required.append({"begin": start, "end": end}) |
|---|
| 857 | return self._send_encoded(request, {"required": required}) |
|---|
| 858 | |
|---|
| 859 | @_authorized_route( |
|---|
| 860 | _app, |
|---|
| 861 | set(), |
|---|
| 862 | "/storage/v1/immutable/<storage_index:storage_index>/shares", |
|---|
| 863 | methods=["GET"], |
|---|
| 864 | ) |
|---|
| 865 | def list_shares( |
|---|
| 866 | self, request: Request, authorization: SecretsDict, storage_index: bytes |
|---|
| 867 | ) -> KleinRenderable: |
|---|
| 868 | """ |
|---|
| 869 | List shares for the given storage index. |
|---|
| 870 | """ |
|---|
| 871 | share_numbers = set(self._storage_server.get_buckets(storage_index).keys()) |
|---|
| 872 | return self._send_encoded(request, share_numbers) |
|---|
| 873 | |
|---|
| 874 | @_authorized_route( |
|---|
| 875 | _app, |
|---|
| 876 | set(), |
|---|
| 877 | "/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>", |
|---|
| 878 | methods=["GET"], |
|---|
| 879 | ) |
|---|
| 880 | def read_share_chunk( |
|---|
| 881 | self, |
|---|
| 882 | request: Request, |
|---|
| 883 | authorization: SecretsDict, |
|---|
| 884 | storage_index: bytes, |
|---|
| 885 | share_number: int, |
|---|
| 886 | ) -> KleinRenderable: |
|---|
| 887 | """Read a chunk for an already uploaded immutable.""" |
|---|
| 888 | request.setHeader("content-type", "application/octet-stream") |
|---|
| 889 | try: |
|---|
| 890 | bucket = self._storage_server.get_buckets(storage_index)[share_number] |
|---|
| 891 | except KeyError: |
|---|
| 892 | request.setResponseCode(http.NOT_FOUND) |
|---|
| 893 | return b"" |
|---|
| 894 | |
|---|
| 895 | return read_range(request, bucket.read, bucket.get_length()) |
|---|
| 896 | |
|---|
| 897 | @_authorized_route( |
|---|
| 898 | _app, |
|---|
| 899 | {Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL}, |
|---|
| 900 | "/storage/v1/lease/<storage_index:storage_index>", |
|---|
| 901 | methods=["PUT"], |
|---|
| 902 | ) |
|---|
| 903 | def add_or_renew_lease( |
|---|
| 904 | self, request: Request, authorization: SecretsDict, storage_index: bytes |
|---|
| 905 | ) -> KleinRenderable: |
|---|
| 906 | """Update the lease for an immutable or mutable share.""" |
|---|
| 907 | if not list(self._storage_server.get_shares(storage_index)): |
|---|
| 908 | raise _HTTPError(http.NOT_FOUND) |
|---|
| 909 | |
|---|
| 910 | # Checking of the renewal secret is done by the backend. |
|---|
| 911 | self._storage_server.add_lease( |
|---|
| 912 | storage_index, |
|---|
| 913 | authorization[Secrets.LEASE_RENEW], |
|---|
| 914 | authorization[Secrets.LEASE_CANCEL], |
|---|
| 915 | ) |
|---|
| 916 | |
|---|
| 917 | request.setResponseCode(http.NO_CONTENT) |
|---|
| 918 | return b"" |
|---|
| 919 | |
|---|
| 920 | @_authorized_route( |
|---|
| 921 | _app, |
|---|
| 922 | set(), |
|---|
| 923 | "/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt", |
|---|
| 924 | methods=["POST"], |
|---|
| 925 | ) |
|---|
| 926 | @async_to_deferred |
|---|
| 927 | async def advise_corrupt_share_immutable( |
|---|
| 928 | self, |
|---|
| 929 | request: Request, |
|---|
| 930 | authorization: SecretsDict, |
|---|
| 931 | storage_index: bytes, |
|---|
| 932 | share_number: int, |
|---|
| 933 | ) -> KleinRenderable: |
|---|
| 934 | """Indicate that given share is corrupt, with a text reason.""" |
|---|
| 935 | try: |
|---|
| 936 | bucket = self._storage_server.get_buckets(storage_index)[share_number] |
|---|
| 937 | except KeyError: |
|---|
| 938 | raise _HTTPError(http.NOT_FOUND) |
|---|
| 939 | |
|---|
| 940 | # The reason can be a string with explanation, so in theory it could be |
|---|
| 941 | # longish? |
|---|
| 942 | info = await read_encoded( |
|---|
| 943 | self._reactor, |
|---|
| 944 | request, |
|---|
| 945 | _SCHEMAS["advise_corrupt_share"], |
|---|
| 946 | max_size=32768, |
|---|
| 947 | ) |
|---|
| 948 | bucket.advise_corrupt_share(info["reason"].encode("utf-8")) |
|---|
| 949 | return b"" |
|---|
| 950 | |
|---|
| 951 | ##### Mutable APIs ##### |
|---|
| 952 | |
|---|
| 953 | @_authorized_route( |
|---|
| 954 | _app, |
|---|
| 955 | {Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL, Secrets.WRITE_ENABLER}, |
|---|
| 956 | "/storage/v1/mutable/<storage_index:storage_index>/read-test-write", |
|---|
| 957 | methods=["POST"], |
|---|
| 958 | ) |
|---|
| 959 | @async_to_deferred |
|---|
| 960 | async def mutable_read_test_write( |
|---|
| 961 | self, request: Request, authorization: SecretsDict, storage_index: bytes |
|---|
| 962 | ) -> KleinRenderable: |
|---|
| 963 | """Read/test/write combined operation for mutables.""" |
|---|
| 964 | rtw_request = await read_encoded( |
|---|
| 965 | self._reactor, |
|---|
| 966 | request, |
|---|
| 967 | _SCHEMAS["mutable_read_test_write"], |
|---|
| 968 | max_size=2**48, |
|---|
| 969 | ) |
|---|
| 970 | secrets = ( |
|---|
| 971 | authorization[Secrets.WRITE_ENABLER], |
|---|
| 972 | authorization[Secrets.LEASE_RENEW], |
|---|
| 973 | authorization[Secrets.LEASE_CANCEL], |
|---|
| 974 | ) |
|---|
| 975 | try: |
|---|
| 976 | success, read_data = self._storage_server.slot_testv_and_readv_and_writev( |
|---|
| 977 | storage_index, |
|---|
| 978 | secrets, |
|---|
| 979 | { |
|---|
| 980 | k: ( |
|---|
| 981 | [ |
|---|
| 982 | (d["offset"], d["size"], b"eq", d["specimen"]) |
|---|
| 983 | for d in v["test"] |
|---|
| 984 | ], |
|---|
| 985 | [(d["offset"], d["data"]) for d in v["write"]], |
|---|
| 986 | v["new-length"], |
|---|
| 987 | ) |
|---|
| 988 | for (k, v) in rtw_request["test-write-vectors"].items() |
|---|
| 989 | }, |
|---|
| 990 | [(d["offset"], d["size"]) for d in rtw_request["read-vector"]], |
|---|
| 991 | ) |
|---|
| 992 | except BadWriteEnablerError: |
|---|
| 993 | raise _HTTPError(http.UNAUTHORIZED) |
|---|
| 994 | return await self._send_encoded( |
|---|
| 995 | request, {"success": success, "data": read_data} |
|---|
| 996 | ) |
|---|
| 997 | |
|---|
| 998 | @_authorized_route( |
|---|
| 999 | _app, |
|---|
| 1000 | set(), |
|---|
| 1001 | "/storage/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>", |
|---|
| 1002 | methods=["GET"], |
|---|
| 1003 | ) |
|---|
| 1004 | def read_mutable_chunk( |
|---|
| 1005 | self, |
|---|
| 1006 | request: Request, |
|---|
| 1007 | authorization: SecretsDict, |
|---|
| 1008 | storage_index: bytes, |
|---|
| 1009 | share_number: int, |
|---|
| 1010 | ) -> KleinRenderable: |
|---|
| 1011 | """Read a chunk from a mutable.""" |
|---|
| 1012 | request.setHeader("content-type", "application/octet-stream") |
|---|
| 1013 | |
|---|
| 1014 | try: |
|---|
| 1015 | share_length = self._storage_server.get_mutable_share_length( |
|---|
| 1016 | storage_index, share_number |
|---|
| 1017 | ) |
|---|
| 1018 | except KeyError: |
|---|
| 1019 | raise _HTTPError(http.NOT_FOUND) |
|---|
| 1020 | |
|---|
| 1021 | def read_data(offset, length): |
|---|
| 1022 | try: |
|---|
| 1023 | return self._storage_server.slot_readv( |
|---|
| 1024 | storage_index, [share_number], [(offset, length)] |
|---|
| 1025 | )[share_number][0] |
|---|
| 1026 | except KeyError: |
|---|
| 1027 | raise _HTTPError(http.NOT_FOUND) |
|---|
| 1028 | |
|---|
| 1029 | return read_range(request, read_data, share_length) |
|---|
| 1030 | |
|---|
| 1031 | @_authorized_route( |
|---|
| 1032 | _app, |
|---|
| 1033 | set(), |
|---|
| 1034 | "/storage/v1/mutable/<storage_index:storage_index>/shares", |
|---|
| 1035 | methods=["GET"], |
|---|
| 1036 | ) |
|---|
| 1037 | def enumerate_mutable_shares(self, request, authorization, storage_index): |
|---|
| 1038 | """List mutable shares for a storage index.""" |
|---|
| 1039 | shares = self._storage_server.enumerate_mutable_shares(storage_index) |
|---|
| 1040 | return self._send_encoded(request, shares) |
|---|
| 1041 | |
|---|
| 1042 | @_authorized_route( |
|---|
| 1043 | _app, |
|---|
| 1044 | set(), |
|---|
| 1045 | "/storage/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt", |
|---|
| 1046 | methods=["POST"], |
|---|
| 1047 | ) |
|---|
| 1048 | @async_to_deferred |
|---|
| 1049 | async def advise_corrupt_share_mutable( |
|---|
| 1050 | self, |
|---|
| 1051 | request: Request, |
|---|
| 1052 | authorization: SecretsDict, |
|---|
| 1053 | storage_index: bytes, |
|---|
| 1054 | share_number: int, |
|---|
| 1055 | ) -> KleinRenderable: |
|---|
| 1056 | """Indicate that given share is corrupt, with a text reason.""" |
|---|
| 1057 | if share_number not in { |
|---|
| 1058 | shnum for (shnum, _) in self._storage_server.get_shares(storage_index) |
|---|
| 1059 | }: |
|---|
| 1060 | raise _HTTPError(http.NOT_FOUND) |
|---|
| 1061 | |
|---|
| 1062 | # The reason can be a string with explanation, so in theory it could be |
|---|
| 1063 | # longish? |
|---|
| 1064 | info = await read_encoded( |
|---|
| 1065 | self._reactor, request, _SCHEMAS["advise_corrupt_share"], max_size=32768 |
|---|
| 1066 | ) |
|---|
| 1067 | self._storage_server.advise_corrupt_share( |
|---|
| 1068 | b"mutable", storage_index, share_number, info["reason"].encode("utf-8") |
|---|
| 1069 | ) |
|---|
| 1070 | return b"" |
|---|
| 1071 | |
|---|
| 1072 | |
|---|
| 1073 | @implementer(IStreamServerEndpoint) |
|---|
| 1074 | @define |
|---|
| 1075 | class _TLSEndpointWrapper: |
|---|
| 1076 | """ |
|---|
| 1077 | Wrap an existing endpoint with the server-side storage TLS policy. This is |
|---|
| 1078 | useful because not all Tahoe-LAFS endpoints might be plain TCP+TLS, for |
|---|
| 1079 | example there's Tor and i2p. |
|---|
| 1080 | """ |
|---|
| 1081 | |
|---|
| 1082 | endpoint: IStreamServerEndpoint |
|---|
| 1083 | context_factory: CertificateOptions |
|---|
| 1084 | |
|---|
| 1085 | @classmethod |
|---|
| 1086 | def from_paths( |
|---|
| 1087 | cls: type[_TLSEndpointWrapper], |
|---|
| 1088 | endpoint: IStreamServerEndpoint, |
|---|
| 1089 | private_key_path: FilePath, |
|---|
| 1090 | cert_path: FilePath, |
|---|
| 1091 | ) -> "_TLSEndpointWrapper": |
|---|
| 1092 | """ |
|---|
| 1093 | Create an endpoint with the given private key and certificate paths on |
|---|
| 1094 | the filesystem. |
|---|
| 1095 | """ |
|---|
| 1096 | certificate = Certificate.loadPEM(cert_path.getContent()).original |
|---|
| 1097 | private_key = PrivateCertificate.loadPEM( |
|---|
| 1098 | cert_path.getContent() + b"\n" + private_key_path.getContent() |
|---|
| 1099 | ).privateKey.original |
|---|
| 1100 | certificate_options = CertificateOptions( |
|---|
| 1101 | privateKey=private_key, certificate=certificate |
|---|
| 1102 | ) |
|---|
| 1103 | return cls(endpoint=endpoint, context_factory=certificate_options) |
|---|
| 1104 | |
|---|
| 1105 | def listen(self, factory: IProtocolFactory) -> Deferred[IListeningPort]: |
|---|
| 1106 | return self.endpoint.listen( |
|---|
| 1107 | TLSMemoryBIOFactory(self.context_factory, False, factory) |
|---|
| 1108 | ) |
|---|
| 1109 | |
|---|
| 1110 | |
|---|
| 1111 | def build_nurl( |
|---|
| 1112 | hostname: str, |
|---|
| 1113 | port: int, |
|---|
| 1114 | swissnum: str, |
|---|
| 1115 | certificate: CryptoCertificate, |
|---|
| 1116 | subscheme: Optional[str] = None, |
|---|
| 1117 | ) -> DecodedURL: |
|---|
| 1118 | """ |
|---|
| 1119 | Construct a HTTPS NURL, given the hostname, port, server swissnum, and x509 |
|---|
| 1120 | certificate for the server. Clients can then connect to the server using |
|---|
| 1121 | this NURL. |
|---|
| 1122 | """ |
|---|
| 1123 | scheme = "pb" |
|---|
| 1124 | if subscheme is not None: |
|---|
| 1125 | scheme = f"{scheme}+{subscheme}" |
|---|
| 1126 | return DecodedURL().replace( |
|---|
| 1127 | fragment="v=1", # how we know this NURL is HTTP-based (i.e. not Foolscap) |
|---|
| 1128 | host=hostname, |
|---|
| 1129 | port=port, |
|---|
| 1130 | path=(swissnum,), |
|---|
| 1131 | userinfo=( |
|---|
| 1132 | str( |
|---|
| 1133 | get_spki_hash(certificate), |
|---|
| 1134 | "ascii", |
|---|
| 1135 | ), |
|---|
| 1136 | ), |
|---|
| 1137 | scheme=scheme, |
|---|
| 1138 | ) |
|---|
| 1139 | |
|---|
| 1140 | |
|---|
| 1141 | def listen_tls( |
|---|
| 1142 | server: HTTPServer, |
|---|
| 1143 | hostname: str, |
|---|
| 1144 | endpoint: IStreamServerEndpoint, |
|---|
| 1145 | private_key_path: FilePath, |
|---|
| 1146 | cert_path: FilePath, |
|---|
| 1147 | ) -> Deferred[tuple[DecodedURL, IListeningPort]]: |
|---|
| 1148 | """ |
|---|
| 1149 | Start a HTTPS storage server on the given port, return the NURL and the |
|---|
| 1150 | listening port. |
|---|
| 1151 | |
|---|
| 1152 | The hostname is the external IP or hostname clients will connect to, used |
|---|
| 1153 | to constrtuct the NURL; it does not modify what interfaces the server |
|---|
| 1154 | listens on. |
|---|
| 1155 | |
|---|
| 1156 | This will likely need to be updated eventually to handle Tor/i2p. |
|---|
| 1157 | """ |
|---|
| 1158 | endpoint = _TLSEndpointWrapper.from_paths(endpoint, private_key_path, cert_path) |
|---|
| 1159 | |
|---|
| 1160 | def get_nurl(listening_port: IListeningPort) -> DecodedURL: |
|---|
| 1161 | address = cast(Union[IPv4Address, IPv6Address], listening_port.getHost()) |
|---|
| 1162 | return build_nurl( |
|---|
| 1163 | hostname, |
|---|
| 1164 | address.port, |
|---|
| 1165 | str(server._swissnum, "ascii"), |
|---|
| 1166 | load_pem_x509_certificate(cert_path.getContent()), |
|---|
| 1167 | ) |
|---|
| 1168 | |
|---|
| 1169 | return endpoint.listen(Site(server.get_resource())).addCallback( |
|---|
| 1170 | lambda listening_port: (get_nurl(listening_port), listening_port) |
|---|
| 1171 | ) |
|---|