| 1 | """ |
|---|
| 2 | Common HTTP infrastructure for the storge server. |
|---|
| 3 | """ |
|---|
| 4 | |
|---|
| 5 | from enum import Enum |
|---|
| 6 | from base64 import urlsafe_b64encode, b64encode |
|---|
| 7 | from hashlib import sha256 |
|---|
| 8 | from typing import Optional |
|---|
| 9 | |
|---|
| 10 | from cryptography.x509 import Certificate |
|---|
| 11 | from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat |
|---|
| 12 | |
|---|
| 13 | from werkzeug.http import parse_options_header |
|---|
| 14 | from twisted.web.http_headers import Headers |
|---|
| 15 | from twisted.web.iweb import IResponse |
|---|
| 16 | |
|---|
| 17 | CBOR_MIME_TYPE = "application/cbor" |
|---|
| 18 | |
|---|
| 19 | |
|---|
| 20 | def get_content_type(headers: Headers) -> Optional[str]: |
|---|
| 21 | """ |
|---|
| 22 | Get the content type from the HTTP ``Content-Type`` header. |
|---|
| 23 | |
|---|
| 24 | Returns ``None`` if no content-type was set. |
|---|
| 25 | """ |
|---|
| 26 | values = headers.getRawHeaders("content-type", [None]) or [None] |
|---|
| 27 | content_type = parse_options_header(values[0])[0] or None |
|---|
| 28 | return content_type |
|---|
| 29 | |
|---|
| 30 | |
|---|
| 31 | def response_is_not_html(response: IResponse) -> None: |
|---|
| 32 | """ |
|---|
| 33 | During tests, this is registered so we can ensure the web server |
|---|
| 34 | doesn't give us text/html. |
|---|
| 35 | |
|---|
| 36 | HTML is never correct except in 404, but it's the default for |
|---|
| 37 | Twisted's web server so we assert nothing unexpected happened. |
|---|
| 38 | """ |
|---|
| 39 | if response.code != 404: |
|---|
| 40 | assert get_content_type(response.headers) != "text/html" |
|---|
| 41 | |
|---|
| 42 | |
|---|
| 43 | def swissnum_auth_header(swissnum: bytes) -> bytes: |
|---|
| 44 | """Return value for ``Authorization`` header.""" |
|---|
| 45 | return b"Tahoe-LAFS " + b64encode(swissnum).strip() |
|---|
| 46 | |
|---|
| 47 | |
|---|
| 48 | class Secrets(Enum): |
|---|
| 49 | """Different kinds of secrets the client may send.""" |
|---|
| 50 | |
|---|
| 51 | LEASE_RENEW = "lease-renew-secret" |
|---|
| 52 | LEASE_CANCEL = "lease-cancel-secret" |
|---|
| 53 | UPLOAD = "upload-secret" |
|---|
| 54 | WRITE_ENABLER = "write-enabler" |
|---|
| 55 | |
|---|
| 56 | |
|---|
| 57 | def get_spki(certificate: Certificate) -> bytes: |
|---|
| 58 | """ |
|---|
| 59 | Get the bytes making up the DER encoded representation of the |
|---|
| 60 | `SubjectPublicKeyInfo` (RFC 7469) for the given certificate. |
|---|
| 61 | """ |
|---|
| 62 | return certificate.public_key().public_bytes( |
|---|
| 63 | Encoding.DER, PublicFormat.SubjectPublicKeyInfo |
|---|
| 64 | ) |
|---|
| 65 | |
|---|
| 66 | def get_spki_hash(certificate: Certificate) -> bytes: |
|---|
| 67 | """ |
|---|
| 68 | Get the public key hash, as per RFC 7469: base64 of sha256 of the public |
|---|
| 69 | key encoded in DER + Subject Public Key Info format. |
|---|
| 70 | |
|---|
| 71 | We use the URL-safe base64 variant, since this is typically found in NURLs. |
|---|
| 72 | """ |
|---|
| 73 | spki_bytes = get_spki(certificate) |
|---|
| 74 | return urlsafe_b64encode(sha256(spki_bytes).digest()).strip().rstrip(b"=") |
|---|