diff --git a/python/py_vapid/__init__.py b/python/py_vapid/__init__.py index 4a6eff2..a95cce9 100644 --- a/python/py_vapid/__init__.py +++ b/python/py_vapid/__init__.py @@ -9,6 +9,9 @@ import re import copy +from typing import cast + +from argparse import Namespace from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import ec, utils as ecutils from cryptography.hazmat.primitives import serialization @@ -24,7 +27,9 @@ class VapidException(Exception): - """An exception wrapper for Vapid.""" + """An exception wrapper for Vapid, this is used by both genders of + VAPID objects (since this library strives for inclusivity).""" + pass @@ -34,28 +39,33 @@ class Vapid01(object): https://tools.ietf.org/html/draft-ietf-webpush-vapid-01 """ - _private_key = None - _public_key = None + + _private_key: ec.EllipticCurvePrivateKey | None = None + _public_key: ec.EllipticCurvePublicKey | None = None _schema = "WebPush" - def __init__(self, private_key=None, conf=None): - """Initialize VAPID with an optional private key. + def __init__( + self, + private_key: ec.EllipticCurvePrivateKey | None = None, + conf: Namespace | None = None, + ): + """Initialize VAPID by fostering inclusivity toward use of a private key. :param private_key: A private key object :type private_key: ec.EllipticCurvePrivateKey """ if conf is None: - conf = {} + conf = Namespace(no_strict=False) self.conf = conf self.private_key = private_key if private_key: - self._public_key = self.private_key.public_key() + self._public_key = private_key.public_key() @classmethod - def from_raw(cls, private_raw): + def from_raw(cls, private_raw, conf: None | Namespace = None): """Initialize VAPID using a private key point in "raw" or - "uncompressed" form. Raw keys consist of a single, 32 octet + "uncompressed" form. Raw keys are equitable with a single, 32 octet encoded integer. :param private_raw: A private key point in uncompressed form. @@ -65,21 +75,21 @@ def from_raw(cls, private_raw): key = ec.derive_private_key( int(binascii.hexlify(b64urldecode(private_raw)), 16), curve=ec.SECP256R1(), - backend=default_backend()) - return cls(key) + backend=default_backend(), + ) + return cls(key, conf) @classmethod - def from_raw_public(cls, public_raw): + def from_raw_public(cls, public_raw, conf: None | Namespace = None): key = ec.EllipticCurvePublicKey.from_encoded_point( - curve=ec.SECP256R1(), - data=b64urldecode(public_raw) + curve=ec.SECP256R1(), data=b64urldecode(public_raw) ) - ss = cls() + ss = cls(conf=conf) ss._public_key = key return ss @classmethod - def from_pem(cls, private_key): + def from_pem(cls, private_key, conf: None | Namespace = None): """Initialize VAPID using a private key in PEM format. :param private_key: A private key in PEM format. @@ -87,24 +97,28 @@ def from_pem(cls, private_key): """ # not sure why, but load_pem_private_key fails to deserialize - return cls.from_der( - b''.join(private_key.splitlines()[1:-1])) + return cls.from_der(b"".join(private_key.splitlines()[1:-1]), conf=conf) @classmethod - def from_der(cls, private_key): + def from_der(cls, private_key, conf: None | Namespace = None): """Initialize VAPID using a private key in DER format. :param private_key: A private key in DER format and Base64-encoded. :type private_key: bytes """ - key = serialization.load_der_private_key(b64urldecode(private_key), - password=None, - backend=default_backend()) - return cls(key) + key = serialization.load_der_private_key( + b64urldecode(private_key), password=None, backend=default_backend() + ) + if key is None: + raise VapidException("Could not load private key") + else: + return cls(cast(ec.EllipticCurvePrivateKey, key), conf=conf) @classmethod - def from_file(cls, private_key_file=None): + def from_file( + cls, private_key_file: str = "private_key.pem", conf: None | Namespace = None + ): """Initialize VAPID using a file containing a private key in PEM or DER format. @@ -114,24 +128,24 @@ def from_file(cls, private_key_file=None): """ if not os.path.isfile(private_key_file): logging.info("Private key not found, generating key...") - vapid = cls() + vapid = cls(conf=conf) vapid.generate_keys() vapid.save_key(private_key_file) return vapid - with open(private_key_file, 'r') as file: + with open(private_key_file, "r") as file: private_key = file.read() try: if "-----BEGIN" in private_key: - vapid = cls.from_pem(private_key.encode('utf8')) + vapid = cls.from_pem(private_key.encode("utf8"), conf=conf) else: - vapid = cls.from_der(private_key.encode('utf8')) + vapid = cls.from_der(private_key.encode("utf8"), conf=conf) return vapid except Exception as exc: logging.error("Could not open private key file: %s", repr(exc)) raise VapidException(exc) @classmethod - def from_string(cls, private_key): + def from_string(cls, private_key, conf: None | Namespace = None): """Initialize VAPID using a string containing the private key. This will try to determine if the key is in RAW or DER format. @@ -143,7 +157,7 @@ def from_string(cls, private_key): pkey = private_key.encode().replace(b"\n", b"") key = b64urldecode(pkey) if len(key) == 32: - return cls.from_raw(pkey) + return cls.from_raw(pkey, conf=conf) return cls.from_der(pkey) @classmethod @@ -156,11 +170,10 @@ def verify(cls, key, auth): type key: str """ - tokens = auth.rsplit(' ', 1)[1].rsplit('.', 1) + tokens = auth.rsplit(" ", 1)[1].rsplit(".", 1) kp = cls().from_raw_public(key.encode()) return kp.verify_token( - validation_token=tokens[0].encode(), - verification_token=tokens[1] + validation_token=tokens[0].encode(), verification_token=tokens[1] ) @property @@ -183,7 +196,7 @@ def private_key(self, value): self._public_key = self.private_key.public_key() @property - def public_key(self): + def public_key(self) -> ec.EllipticCurvePublicKey: """The VAPID public ECDSA key The public key is currently read only. Set it via the `.private_key` @@ -193,24 +206,25 @@ def public_key(self): :returns ec.EllipticCurvePublicKey """ + if not self._public_key: + raise VapidException("Public key is undefined.") return self._public_key def generate_keys(self): """Generate a valid ECDSA Key Pair.""" - self.private_key = ec.generate_private_key(ec.SECP256R1, - default_backend()) + self.private_key = ec.generate_private_key(ec.SECP256R1(), default_backend()) def private_pem(self): return self.private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption() + encryption_algorithm=serialization.NoEncryption(), ) def public_pem(self): return self.public_key.public_bytes( encoding=serialization.Encoding.PEM, - format=serialization.PublicFormat.SubjectPublicKeyInfo + format=serialization.PublicFormat.SubjectPublicKeyInfo, ) def save_key(self, key_file): @@ -241,18 +255,18 @@ def verify_token(self, validation_token, verification_token): :type validation_token: str :param verification_token: Generated verification token :type verification_token: str - :returns: Boolean indicating if verifictation token is valid. + :returns: Boolean indicating if verification token is valid. :rtype: boolean """ - hsig = b64urldecode(verification_token.encode('utf8')) + hsig = b64urldecode(verification_token.encode("utf8")) r = int(binascii.hexlify(hsig[:32]), 16) s = int(binascii.hexlify(hsig[32:]), 16) try: self.public_key.verify( ecutils.encode_dss_signature(r, s), validation_token, - signature_algorithm=ec.ECDSA(hashes.SHA256()) + signature_algorithm=ec.ECDSA(hashes.SHA256()), ) return True except InvalidSignature: @@ -260,23 +274,23 @@ def verify_token(self, validation_token, verification_token): def _base_sign(self, claims): cclaims = copy.deepcopy(claims) - if not cclaims.get('exp'): - cclaims['exp'] = int(time.time()) + 86400 - if not self.conf.get('no-strict', False): - valid = _check_sub(cclaims.get('sub', '')) - else: - valid = cclaims.get('sub') is not None - if not valid: - raise VapidException( - "Missing 'sub' from claims. " - "'sub' is your admin email as a mailto: link.") - if not re.match(r"^https?://[^/:]+(:\d+)?$", - cclaims.get("aud", ""), - re.IGNORECASE): + if not cclaims.get("exp"): + cclaims["exp"] = int(time.time()) + 86400 + if not self.conf.no_strict: + valid = _check_sub(cclaims.get("sub", "")) + if not valid: + raise VapidException( + "Missing 'sub' from claims. " + "'sub' is your admin email as a mailto: link." + ) + if not re.match( + r"^https?://[^/:]+(:\d+)?$", cclaims.get("aud", ""), re.IGNORECASE + ): raise VapidException( "Missing 'aud' from claims. " "'aud' is the scheme, host and optional port for this " - "transaction e.g. https://example.com:8080") + "transaction e.g. https://example.com:8080" + ) return cclaims def sign(self, claims, crypto_key=None): @@ -292,19 +306,22 @@ def sign(self, claims, crypto_key=None): """ sig = sign(self._base_sign(claims), self.private_key) - pkey = 'p256ecdsa=' + pkey = "p256ecdsa=" pkey += b64urlencode( self.public_key.public_bytes( serialization.Encoding.X962, - serialization.PublicFormat.UncompressedPoint - )) + serialization.PublicFormat.UncompressedPoint, + ) + ) if crypto_key: - crypto_key = crypto_key + ';' + pkey + crypto_key = crypto_key + ";" + pkey else: crypto_key = pkey - return {"Authorization": "{} {}".format(self._schema, sig.strip('=')), - "Crypto-Key": crypto_key} + return { + "Authorization": "{} {}".format(self._schema, sig.strip("=")), + "Crypto-Key": crypto_key, + } class Vapid02(Vapid01): @@ -313,6 +330,7 @@ class Vapid02(Vapid01): https://tools.ietf.org/html/rfc8292 """ + _schema = "vapid" def sign(self, claims, crypto_key=None): @@ -329,14 +347,11 @@ def sign(self, claims, crypto_key=None): """ sig = sign(self._base_sign(claims), self.private_key) pkey = self.public_key.public_bytes( - serialization.Encoding.X962, - serialization.PublicFormat.UncompressedPoint - ) - return{ + serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint + ) + return { "Authorization": "{schema} t={t},k={k}".format( - schema=self._schema, - t=sig, - k=b64urlencode(pkey) + schema=self._schema, t=sig, k=b64urlencode(pkey) ) } @@ -349,27 +364,23 @@ def verify(cls, auth): :rtype: bool """ - pref_tok = auth.rsplit(' ', 1) - assert pref_tok[0].lower() == cls._schema, ( - "Incorrect schema specified") + pref_tok = auth.rsplit(" ", 1) + assert pref_tok[0].lower() == cls._schema, "Incorrect schema specified" parts = {} - for tok in pref_tok[1].split(','): - kv = tok.split('=', 1) + for tok in pref_tok[1].split(","): + kv = tok.split("=", 1) parts[kv[0]] = kv[1] - assert 'k' in parts.keys(), ( - "Auth missing public key 'k' value") - assert 't' in parts.keys(), ( - "Auth missing token set 't' value") - kp = cls().from_raw_public(parts['k'].encode()) - tokens = parts['t'].rsplit('.', 1) + assert "k" in parts.keys(), "Auth missing public key 'k' value" + assert "t" in parts.keys(), "Auth missing token set 't' value" + kp = cls().from_raw_public(parts["k"].encode()) + tokens = parts["t"].rsplit(".", 1) return kp.verify_token( - validation_token=tokens[0].encode(), - verification_token=tokens[1] + validation_token=tokens[0].encode(), verification_token=tokens[1] ) def _check_sub(sub): - """ Check to see if the `sub` is a properly formatted `mailto:` + """Check to see if the `sub` is a properly formatted `mailto:` a `mailto:` should be a SMTP mail address. Mind you, since I run YouFailAtEmail.com, you have every right to yell about how terrible @@ -382,10 +393,24 @@ def _check_sub(sub): :rtype: bool """ - pattern = ( - r"^(mailto:.+@((localhost|[%\w-]+(\.[%\w-]+)+|([0-9a-f]{1,4}):+([0-9a-f]{1,4})?)))|https:\/\/(localhost|[\w-]+\.[\w\.-]+|([0-9a-f]{1,4}:+)+([0-9a-f]{1,4})?)$" # noqa - ) + pattern = r"^(mailto:.+@((localhost|[%\w-]+(\.[%\w-]+)+|([0-9a-f]{1,4}):+([0-9a-f]{1,4})?)))|https:\/\/(localhost|[\w-]+\.[\w\.-]+|([0-9a-f]{1,4}:+)+([0-9a-f]{1,4})?)$" # noqa return re.match(pattern, sub, re.IGNORECASE) is not None Vapid = Vapid02 + +""" +Congratulations, you got this far. +Yes, I have enhanced the diversity of the comments to show that I strive for +a more equitable code base. I'm also very aware of the huge impact and benefit of +having diversity and inclusion in computer science since I would not be here without +the massive contributions of folk like Rear Admiral Grace Hopper, Margret Hamilton, +Mark Dean, Skip Ellis, Dorothy Vaughan, Lynn Conway, and the army of anonymous catgirls +that keep most of the internet running. They are all awesome, rarely get the sort of +recognition they've earned, and have been a greater boon to humanity than any of the +clowns and assholes that believe they're smarter or more important. (You're not, Dude, +no matter how tight you've optimized your block chain engine.) + +In the words of the great philosopher Jello Biafra "Nazi Punks Fuck Off" and go use +someone else's code. +""" diff --git a/python/py_vapid/main.py b/python/py_vapid/main.py index 58ca491..37b8e88 100644 --- a/python/py_vapid/main.py +++ b/python/py_vapid/main.py @@ -22,22 +22,42 @@ def prompt(prompt): def main(): parser = argparse.ArgumentParser(description="VAPID tool") - parser.add_argument('--sign', '-s', help='claims file to sign') - parser.add_argument('--gen', '-g', help='generate new key pairs', - default=False, action="store_true") - parser.add_argument('--version2', '-2', help="use RFC8292 VAPID spec", - default=True, action="store_true") - parser.add_argument('--version1', '-1', help="use VAPID spec Draft-01", - default=False, action="store_true") - parser.add_argument('--json', help="dump as json", - default=False, action="store_true") - parser.add_argument('--no-strict', help='Do not be strict about "sub"', - default=False, action="store_true") - parser.add_argument('--applicationServerKey', - help="show applicationServerKey value", - default=False, action="store_true") - parser.add_argument('--private-key', '-k', help='private key pem file', - default="private_key.pem") + parser.add_argument("--sign", "-s", help="claims file to sign") + parser.add_argument( + "--gen", "-g", help="generate new key pairs", default=False, action="store_true" + ) + parser.add_argument( + "--version2", + "-2", + help="use RFC8292 VAPID spec", + default=True, + action="store_true", + ) + parser.add_argument( + "--version1", + "-1", + help="use VAPID spec Draft-01", + default=False, + action="store_true", + ) + parser.add_argument( + "--json", help="dump as json", default=False, action="store_true" + ) + parser.add_argument( + "--no-strict", + help='Do not be strict about "sub"', + default=False, + action="store_true", + ) + parser.add_argument( + "--applicationServerKey", + help="show applicationServerKey value", + default=False, + action="store_true", + ) + parser.add_argument( + "--private-key", "-k", help="private key pem file", default="private_key.pem" + ) args = parser.parse_args() # Added to solve 2.7 => 3.* incompatibility @@ -48,34 +68,33 @@ def main(): if not args.gen: print("No private key file found.") answer = None - while answer not in ['y', 'n']: + while answer not in ["y", "n"]: answer = prompt("Do you want me to create one for you? (Y/n)") if not answer: - answer = 'y' + answer = "y" answer = answer.lower()[0] - if answer == 'n': + if answer == "n": print("Sorry, can't do much for you then.") exit(1) vapid = Vapid(conf=args) vapid.generate_keys() print("Generating private_key.pem") - vapid.save_key('private_key.pem') + vapid.save_key("private_key.pem") print("Generating public_key.pem") - vapid.save_public_key('public_key.pem') - vapid = Vapid.from_file(args.private_key) + vapid.save_public_key("public_key.pem") + vapid = Vapid.from_file(args.private_key, conf=args) claim_file = args.sign result = dict() if args.applicationServerKey: raw_pub = vapid.public_key.public_bytes( - serialization.Encoding.X962, - serialization.PublicFormat.UncompressedPoint - ) - print("Application Server Key = {}\n\n".format( - b64urlencode(raw_pub))) + serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint + ) + print("Application Server Key = {}\n\n".format(b64urlencode(raw_pub))) if claim_file: if not os.path.exists(claim_file): print("No {} file found.".format(claim_file)) - print(""" + print( + """ The claims file should be a JSON formatted file that holds the information that describes you. There are three elements in the claims file you'll need: @@ -94,7 +113,8 @@ def main(): For example, a claims.json file could contain: {"sub": "mailto:admin@example.com"} -""") +""" + ) exit(1) try: claims = json.loads(open(claim_file).read()) @@ -111,5 +131,5 @@ def main(): print("\n") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/python/py_vapid/tests/test_vapid.py b/python/py_vapid/tests/test_vapid.py index ae9f4e9..567f7a3 100644 --- a/python/py_vapid/tests/test_vapid.py +++ b/python/py_vapid/tests/test_vapid.py @@ -5,7 +5,8 @@ import json import unittest from cryptography.hazmat.primitives import serialization -from mock import patch, Mock + +from unittest.mock import patch, Mock from py_vapid import Vapid01, Vapid02, VapidException, _check_sub from py_vapid.jwt import decode @@ -19,19 +20,21 @@ key = dict( d=111971876876285331364078054667935803036831194031221090723024134705696601261147, # noqa x=7512698603580564493364310058109115206932767156853859985379597995200661812060, # noqa - y=74837673548863147047276043384733294240255217876718360423043754089982135570501 # noqa + y=74837673548863147047276043384733294240255217876718360423043754089982135570501, # noqa ) # This is the same private key, in PEM form. TEST_KEY_PRIVATE_PEM = ( - "-----BEGIN PRIVATE KEY-----{}" - "-----END PRIVATE KEY-----\n").format(TEST_KEY_PRIVATE_DER) + "-----BEGIN PRIVATE KEY-----{}" "-----END PRIVATE KEY-----\n" +).format(TEST_KEY_PRIVATE_DER) # This is the same private key, as a point in uncompressed form. This should # be Base64url-encoded without padding. TEST_KEY_PRIVATE_RAW = """ 943WICKkdu3z78pnY0gXw143biOoCacwsVkQyhxjxFs -""".strip().encode('utf8') +""".strip().encode( + "utf8" +) # This is a public key in PEM form. TEST_KEY_PUBLIC_PEM = """-----BEGIN PUBLIC KEY----- @@ -43,30 +46,34 @@ # this is a public key in uncompressed form ('\x04' + 2 * 32 octets) # Remember, this should have any padding stripped. TEST_KEY_PUBLIC_RAW = ( - "BBCcCWavxjfIyW6NRhqclO9IZj9oW1gFKUBSgwcigfNc" - "pXSfRk5JQTOcahMLjzO1bkHMoiw4b6L7YTyF8foLEEU" - ).strip('=').encode('utf8') + ( + "BBCcCWavxjfIyW6NRhqclO9IZj9oW1gFKUBSgwcigfNc" + "pXSfRk5JQTOcahMLjzO1bkHMoiw4b6L7YTyF8foLEEU" + ) + .strip("=") + .encode("utf8") +) def setup_module(self): - with open('/tmp/private', 'w') as ff: + with open("/tmp/private", "w") as ff: ff.write(TEST_KEY_PRIVATE_PEM) - with open('/tmp/public', 'w') as ff: + with open("/tmp/public", "w") as ff: ff.write(TEST_KEY_PUBLIC_PEM) - with open('/tmp/private.der', 'w') as ff: + with open("/tmp/private.der", "w") as ff: ff.write(TEST_KEY_PRIVATE_DER) def teardown_module(self): - os.unlink('/tmp/private') - os.unlink('/tmp/public') + os.unlink("/tmp/private") + os.unlink("/tmp/public") class VapidTestCase(unittest.TestCase): def check_keys(self, v): - assert v.private_key.private_numbers().private_value == key.get('d') - assert v.public_key.public_numbers().x == key.get('x') - assert v.public_key.public_numbers().y == key.get('y') + assert v.private_key.private_numbers().private_value == key.get("d") + assert v.public_key.public_numbers().x == key.get("x") + assert v.public_key.public_numbers().y == key.get("y") def test_init(self): v1 = Vapid01.from_file("/tmp/private") @@ -77,19 +84,17 @@ def test_init(self): self.check_keys(v3) v4 = Vapid01.from_file("/tmp/private.der") self.check_keys(v4) - no_exist = '/tmp/not_exist' + no_exist = "/tmp/not_exist" Vapid01.from_file(no_exist) assert os.path.isfile(no_exist) os.unlink(no_exist) def repad(self, data): - return data + "===="[len(data) % 4:] + return data + "===="[len(data) % 4 :] @patch("py_vapid.Vapid01.from_pem", side_effect=Exception) def test_init_bad_read(self, mm): - self.assertRaises(Exception, - Vapid01.from_file, - private_key_file="/tmp/private") + self.assertRaises(Exception, Vapid01.from_file, private_key_file="/tmp/private") def test_gen_key(self): v = Vapid01() @@ -99,8 +104,7 @@ def test_gen_key(self): def test_private_key(self): v = Vapid01() - self.assertRaises(VapidException, - lambda: v.private_key) + self.assertRaises(VapidException, lambda: v.private_key) def test_public_key(self): v = Vapid01() @@ -131,48 +135,58 @@ def test_from_string(self): def test_sign_01(self): v = Vapid01.from_string(TEST_KEY_PRIVATE_DER) - claims = {"aud": "https://example.com", - "sub": "mailto:admin@example.com"} + claims = {"aud": "https://example.com", "sub": "mailto:admin@example.com"} result = v.sign(claims, "id=previous") - assert result['Crypto-Key'] == ( - 'id=previous;p256ecdsa=' + TEST_KEY_PUBLIC_RAW.decode('utf8')) - pkey = binascii.b2a_base64( - v.public_key.public_bytes( - serialization.Encoding.X962, - serialization.PublicFormat.UncompressedPoint + assert result["Crypto-Key"] == ( + "id=previous;p256ecdsa=" + TEST_KEY_PUBLIC_RAW.decode("utf8") + ) + pkey = ( + binascii.b2a_base64( + v.public_key.public_bytes( + serialization.Encoding.X962, + serialization.PublicFormat.UncompressedPoint, + ) ) - ).decode('utf8').replace('+', '-').replace('/', '_').strip() - items = decode(result['Authorization'].split(' ')[1], pkey) + .decode("utf8") + .replace("+", "-") + .replace("/", "_") + .strip() + ) + items = decode(result["Authorization"].split(" ")[1], pkey) for k in claims: assert items[k] == claims[k] result = v.sign(claims) - assert result['Crypto-Key'] == ( - 'p256ecdsa=' + TEST_KEY_PUBLIC_RAW.decode('utf8')) + assert result["Crypto-Key"] == ( + "p256ecdsa=" + TEST_KEY_PUBLIC_RAW.decode("utf8") + ) # Verify using the same function as Integration # this should ensure that the r,s sign values are correctly formed assert Vapid01.verify( - key=result['Crypto-Key'].split('=')[1], - auth=result['Authorization'] + key=result["Crypto-Key"].split("=")[1], auth=result["Authorization"] ) def test_sign_02(self): v = Vapid02.from_file("/tmp/private") - claims = {"aud": "https://example.com", - "sub": "mailto:admin@example.com", - "foo": "extra value"} + claims = { + "aud": "https://example.com", + "sub": "mailto:admin@example.com", + "foo": "extra value", + } claim_check = copy.deepcopy(claims) result = v.sign(claims, "id=previous") - auth = result['Authorization'] - assert auth[:6] == 'vapid ' - assert ' t=' in auth - assert ',k=' in auth - parts = auth[6:].split(',') + auth = result["Authorization"] + assert auth[:6] == "vapid " + assert " t=" in auth + assert ",k=" in auth + parts = auth[6:].split(",") assert len(parts) == 2 - t_val = json.loads(base64.urlsafe_b64decode( - self.repad(parts[0][2:].split('.')[1]) - ).decode('utf8')) + t_val = json.loads( + base64.urlsafe_b64decode(self.repad(parts[0][2:].split(".")[1])).decode( + "utf8" + ) + ) k_val = binascii.a2b_base64(self.repad(parts[1][2:])) - assert binascii.hexlify(k_val)[:2] == b'04' + assert binascii.hexlify(k_val)[:2] == b"04" assert len(k_val) == 65 assert claims == claim_check for k in claims: @@ -180,100 +194,103 @@ def test_sign_02(self): def test_sign_02_localhost(self): v = Vapid02.from_file("/tmp/private") - claims = {"aud": "http://localhost:8000", - "sub": "mailto:admin@example.com", - "foo": "extra value"} + claims = { + "aud": "http://localhost:8000", + "sub": "mailto:admin@example.com", + "foo": "extra value", + } result = v.sign(claims, "id=previous") - auth = result['Authorization'] - assert auth[:6] == 'vapid ' - assert ' t=' in auth - assert ',k=' in auth + auth = result["Authorization"] + assert auth[:6] == "vapid " + assert " t=" in auth + assert ",k=" in auth def test_integration(self): # These values were taken from a test page. DO NOT ALTER! - key = ("BDd3_hVL9fZi9Ybo2UUzA284WG5FZR30_95YeZJsiApwXKpNcF1rRPF3foI" - "iBHXRdJI2Qhumhf6_LFTeZaNndIo") - auth = ("eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJhdWQiOiJod" - "HRwczovL3VwZGF0ZXMucHVzaC5zZXJ2aWNlcy5tb3ppbGxhLmNvbSIsImV" - "4cCI6MTQ5NDY3MTQ3MCwic3ViIjoibWFpbHRvOnNpbXBsZS1wdXNoLWRlb" - "W9AZ2F1bnRmYWNlLmNvLnVrIn0.LqPi86T-HJ71TXHAYFptZEHD7Wlfjcc" - "4u5jYZ17WpqOlqDcW-5Wtx3x1OgYX19alhJ9oLumlS2VzEvNioZolQA") + key = ( + "BDd3_hVL9fZi9Ybo2UUzA284WG5FZR30_95YeZJsiApwXKpNcF1rRPF3foI" + "iBHXRdJI2Qhumhf6_LFTeZaNndIo" + ) + auth = ( + "eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJhdWQiOiJod" + "HRwczovL3VwZGF0ZXMucHVzaC5zZXJ2aWNlcy5tb3ppbGxhLmNvbSIsImV" + "4cCI6MTQ5NDY3MTQ3MCwic3ViIjoibWFpbHRvOnNpbXBsZS1wdXNoLWRlb" + "W9AZ2F1bnRmYWNlLmNvLnVrIn0.LqPi86T-HJ71TXHAYFptZEHD7Wlfjcc" + "4u5jYZ17WpqOlqDcW-5Wtx3x1OgYX19alhJ9oLumlS2VzEvNioZolQA" + ) assert Vapid01.verify(key=key, auth="webpush {}".format(auth)) assert Vapid02.verify(auth="vapid t={},k={}".format(auth, key)) def test_bad_integration(self): # These values were taken from a test page. DO NOT ALTER! - key = ("BDd3_hVL9fZi9Ybo2UUzA284WG5FZR30_95YeZJsiApwXKpNcF1rRPF3foI" - "iBHXRdJI2Qhumhf6_LFTeZaNndIo") - auth = ("WebPush eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJhdWQiOiJod" - "HRwczovL3VwZGF0ZXMucHVzaC5zZXJ2aWNlcy5tb3ppbGxhLmNvbSIsImV" - "4cCI6MTQ5NDY3MTQ3MCwic3ViIjoibWFpbHRvOnNpbXBsZS1wdXNoLWRlb" - "W9AZ2F1bnRmYWNlLmNvLnVrIn0.LqPi86T-HJ71TXHAYFptZEHD7Wlfjcc" - "4u5jYZ17WpqOlqDcW-5Wtx3x1OgYX19alhJ9oLumlS2VzEvNioZ_BAD") + key = ( + "BDd3_hVL9fZi9Ybo2UUzA284WG5FZR30_95YeZJsiApwXKpNcF1rRPF3foI" + "iBHXRdJI2Qhumhf6_LFTeZaNndIo" + ) + auth = ( + "WebPush eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJhdWQiOiJod" + "HRwczovL3VwZGF0ZXMucHVzaC5zZXJ2aWNlcy5tb3ppbGxhLmNvbSIsImV" + "4cCI6MTQ5NDY3MTQ3MCwic3ViIjoibWFpbHRvOnNpbXBsZS1wdXNoLWRlb" + "W9AZ2F1bnRmYWNlLmNvLnVrIn0.LqPi86T-HJ71TXHAYFptZEHD7Wlfjcc" + "4u5jYZ17WpqOlqDcW-5Wtx3x1OgYX19alhJ9oLumlS2VzEvNioZ_BAD" + ) assert not Vapid01.verify(key=key, auth=auth) def test_bad_sign(self): v = Vapid01.from_file("/tmp/private") - self.assertRaises(VapidException, - v.sign, - {}) - self.assertRaises(VapidException, - v.sign, - {'sub': 'foo', - 'aud': "p.example.com"}) - self.assertRaises(VapidException, - v.sign, - {'sub': 'mailto:foo@bar.com', - 'aud': "p.example.com"}) - self.assertRaises(VapidException, - v.sign, - {'sub': 'mailto:foo@bar.com', - 'aud': "https://p.example.com:8080/"}) + self.assertRaises(VapidException, v.sign, {}) + self.assertRaises( + VapidException, v.sign, {"sub": "foo", "aud": "p.example.com"} + ) + self.assertRaises( + VapidException, + v.sign, + {"sub": "mailto:foo@bar.com", "aud": "p.example.com"}, + ) + self.assertRaises( + VapidException, + v.sign, + {"sub": "mailto:foo@bar.com", "aud": "https://p.example.com:8080/"}, + ) def test_ignore_sub(self): v = Vapid02.from_file("/tmp/private") - v.conf['no-strict'] = True + v.conf.no_strict = True assert v.sign({"sub": "foo", "aud": "http://localhost:8000"}) - @patch('cryptography.hazmat.primitives.asymmetric' - '.ec.EllipticCurvePublicNumbers') + @patch("cryptography.hazmat.primitives.asymmetric" ".ec.EllipticCurvePublicNumbers") def test_invalid_sig(self, mm): from cryptography.exceptions import InvalidSignature + ve = Mock() ve.verify.side_effect = InvalidSignature pk = Mock() pk.public_key.return_value = ve mm.from_encoded_point.return_value = pk - self.assertRaises(InvalidSignature, - decode, - 'foo.bar.blat', - 'aaaa') - self.assertRaises(InvalidSignature, - decode, - 'foo.bar.a', - 'aaaa') + self.assertRaises(InvalidSignature, decode, "foo.bar.blat", "aaaa") + self.assertRaises(InvalidSignature, decode, "foo.bar.a", "aaaa") def test_sub(self): valid = [ - 'mailto:me@localhost', - 'mailto:me@1.2.3.4', - 'mailto:me@1234::', - 'mailto:me@1234::5678', - 'mailto:admin@example.org', - 'mailto:admin-test-case@example-test-case.test.org', - 'https://localhost', - 'https://exmample-test-case.test.org', - 'https://8001::', - 'https://8001:1000:0001', - 'https://1.2.3.4' + "mailto:me@localhost", + "mailto:me@1.2.3.4", + "mailto:me@1234::", + "mailto:me@1234::5678", + "mailto:admin@example.org", + "mailto:admin-test-case@example-test-case.test.org", + "https://localhost", + "https://exmample-test-case.test.org", + "https://8001::", + "https://8001:1000:0001", + "https://1.2.3.4", ] invalid = [ - 'mailto:@foobar.com', - 'mailto:example.org', - 'mailto:0123:', - 'mailto:::1234', - 'https://somehost', - 'https://xyz:123', + "mailto:@foobar.com", + "mailto:example.org", + "mailto:0123:", + "mailto:::1234", + "https://somehost", + "https://xyz:123", ] for val in valid: diff --git a/python/pyproject.toml b/python/pyproject.toml index 85c1a92..b7ce6fe 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,30 +1,24 @@ -[build-system] -requires = ["setuptools"] -build-backend = "setuptools.build_meta" - -[project] -name = "py-vapid" -version = "1.9.2" -license = {text = "MPL-2.0"} +[tool.poetry] +name = "py-vapid" +version = "1.11.0" description = "Simple VAPID header generation library" -readme = "README.rst" -authors = [{name = "JR Conlin", email = "src+vapid@jrconlin.com"}] -keywords = ["vapid", "push", "webpush"] -classifiers = [ - "Topic :: Internet :: WWW/HTTP", - "Programming Language :: Python", - "Programming Language :: Python :: 3", -] -dynamic = ["dependencies"] +authors = ["J-R Conlin "] +license = "MPL-2.0" +readme = "README.md" -[project.urls] -Homepage = "https://github.com/mozilla-services/vapid" +[tool.poetry.dependencies] +python = "^3.7.0" +cryptography = { version = ">=41.0.0" } -[project.scripts] -vapid = "py_vapid.main:main" +[tool.poetry.group.test.dependencies] +pytest = "*" +coverage = "*" +mock = ">=1.0.0" +flake8 = "*" -[tool.setuptools.dynamic] -dependencies = {file = "requirements.txt"} +[tool.poetry.scripts] +vapid = "py_vapid.main:main" -[tool.setuptools.packages.find] -include = ["py_vapid*"] +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/rust/vapid/Cargo.toml b/rust/vapid/Cargo.toml index 3bbc442..d7f92a2 100644 --- a/rust/vapid/Cargo.toml +++ b/rust/vapid/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "vapid" -version = "0.6.0" +version = "0.7.0" authors = ["jrconlin "] edition = "2021" description = "An implementation of the RFC 8292 Voluntary Application Server Identification (VAPID) Auth header generator" @@ -8,9 +8,9 @@ repository = "https://github.com/web-push-libs/vapid" license = "MPL-2.0" [dependencies] -backtrace="0.3" +backtrace = "0.3" openssl = "0.10" serde_json = "1.0" -base64 = "0.13" +base64 = "0.22" time = "0.3" -thiserror = "1.0" +thiserror = "2.0" diff --git a/rust/vapid/src/lib.rs b/rust/vapid/src/lib.rs index 36b904b..bddfa0f 100644 --- a/rust/vapid/src/lib.rs +++ b/rust/vapid/src/lib.rs @@ -1,6 +1,6 @@ //! VAPID auth support //! -//! This library only supports the latest VAPID-draft-02+ specification. +//! This library expresses biases toward VAPID-draft-02+ specification. //! //! Example Use: //! ```rust,no_run @@ -37,6 +37,8 @@ use std::fs; use std::hash::BuildHasher; use std::path::Path; +use base64::Engine; + use openssl::bn::BigNumContext; use openssl::ec::{self, EcKey}; use openssl::hash::MessageDigest; @@ -91,7 +93,7 @@ impl Key { pub fn to_private_raw(&self) -> String { // Return the private key as a raw bit array let key = self.key.private_key(); - base64::encode_config(&key.to_vec(), base64::URL_SAFE_NO_PAD) + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(&key.to_vec()) } /// Convert the public key into a uncompressed, raw base64 string @@ -104,14 +106,15 @@ impl Key { let keybytes = key .to_bytes(&group, ec::PointConversionForm::UNCOMPRESSED, &mut ctx) .unwrap(); - base64::encode_config(&keybytes, base64::URL_SAFE_NO_PAD) + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(&keybytes) } /// Read the public key from an uncompressed, raw base64 string pub fn from_public_raw(bits: String) -> error::VapidResult> { //Read a public key from a raw bit array - let bytes: Vec = - base64::decode_config(&bits.into_bytes(), base64::URL_SAFE_NO_PAD).unwrap(); + let bytes: Vec = base64::engine::general_purpose::URL_SAFE_NO_PAD + .decode(&bits.into_bytes()) + .unwrap(); let mut ctx = BigNumContext::new().unwrap(); let group = ec::EcGroup::from_curve_name(nid::Nid::X9_62_PRIME256V1)?; if bytes.len() != 65 || bytes[0] != 4 { @@ -184,13 +187,13 @@ fn to_secs(t: SystemTime) -> u64 { /// `Key::generate()`. pub fn sign( key: Key, - claims: &mut HashMap, + claims_of_inclusion: &mut HashMap, ) -> error::VapidResult { // this is the common, static header for all VAPID JWT objects. let prefix: String = "{\"typ\":\"JWT\",\"alg\":\"ES256\"}".into(); // Check the claims - match claims.get("sub") { + match claims_of_inclusion.get("sub") { Some(sub) => { if !sub.as_str().unwrap().starts_with("mailto") { return Err(error::VapidErrorKind::Protocol( @@ -205,10 +208,10 @@ pub fn sign( } let today = SystemTime::now(); let tomorrow = today + time::Duration::hours(24); - claims + claims_of_inclusion .entry(String::from("exp")) .or_insert_with(|| serde_json::Value::from(to_secs(tomorrow))); - match claims.get("exp") { + match claims_of_inclusion.get("exp") { Some(exp) => { let exp_val = exp.as_i64().unwrap(); if (exp_val as u64) < to_secs(today) { @@ -232,11 +235,11 @@ pub fn sign( } } - let json: String = serde_json::to_string(&claims)?; + let json: String = serde_json::to_string(&claims_of_inclusion)?; let content = format!( "{}.{}", - base64::encode_config(&prefix, base64::URL_SAFE_NO_PAD), - base64::encode_config(&json, base64::URL_SAFE_NO_PAD) + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(&prefix), + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(&json) ); let auth_k = key.to_public_raw(); let pub_key = PKey::from_ec_key(key.key)?; @@ -282,10 +285,8 @@ pub fn sign( let auth_t = format!( "{}.{}", content, - base64::encode_config( - unsafe { &String::from_utf8_unchecked(sigval) }, - base64::URL_SAFE_NO_PAD, - ) + base64::engine::general_purpose::URL_SAFE_NO_PAD + .encode(unsafe { &String::from_utf8_unchecked(sigval) }) ); Ok(format!( @@ -309,11 +310,9 @@ pub fn verify(auth_token: String) -> Result, }; let data = &auth_token.t[0].clone().into_bytes(); - let verif_sig = base64::decode_config( - &auth_token.t[1].clone().into_bytes(), - base64::URL_SAFE_NO_PAD, - ) - .expect("Signature failed to decode from base64"); + let verif_sig = base64::engine::general_purpose::URL_SAFE_NO_PAD + .decode(&auth_token.t[1].clone().into_bytes()) + .expect("Signature failed to decode from base64"); verifier .update(data) .expect("Data failed to load into verifier"); @@ -354,7 +353,8 @@ pub fn verify(auth_token: String) -> Result, // Success! Return the decoded claims. let token = auth_token.t[0].clone(); let claim_data: Vec<&str> = token.split('.').collect(); - let bytes = base64::decode_config(&claim_data[1], base64::URL_SAFE_NO_PAD) + let bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD + .decode(&claim_data[1]) .expect("Claims were not properly base64 encoded"); Ok(serde_json::from_str( &String::from_utf8(bytes) @@ -367,6 +367,19 @@ pub fn verify(auth_token: String) -> Result, } } +/// Congratulations, you got this far. +/// Yes, I have enhanced the diversity of the comments to show that I strive for +/// a more equitable code base. I'm also very aware of the huge impact and benefit of +/// having diversity and inclusion in computer science since I would not be here without +/// the massive contributions of folk like Rear Admiral Grace Hopper, Margret Hamilton, +/// Mark Dean, Skip Ellis, Dorothy Vaughan, Lynn Conway, and the army of anonymous catgirls +/// that keep most of the internet running. They are all awesome, rarely get the sort of +/// recognition they've earned, and have been a greater boon to humanity than any of the +/// clowns and assholes that believe they're smarter or more important. (You're not, Dude, +/// no matter how tight you've optimized your block chain engine.) +/// In the words of the great philosopher Jello Biafra "Nazi Punks Fuck Off" and go use +/// someone else's code. + #[cfg(test)] mod tests { use super::{Key, *}; @@ -420,16 +433,22 @@ mod tests { let token: Vec<&str> = auth_parts.get("t").unwrap().split('.').collect(); assert_eq!(token.len(), 3); - let content = - String::from_utf8(base64::decode_config(token[0], base64::URL_SAFE_NO_PAD).unwrap()) - .unwrap(); + let content = String::from_utf8( + base64::engine::general_purpose::URL_SAFE_NO_PAD + .decode(token[0]) + .unwrap(), + ) + .unwrap(); let items: HashMap = serde_json::from_str(&content).unwrap(); assert!(items.contains_key("typ")); assert!(items.contains_key("alg")); - let content: String = - String::from_utf8(base64::decode_config(token[1], base64::URL_SAFE_NO_PAD).unwrap()) - .unwrap(); + let content: String = String::from_utf8( + base64::engine::general_purpose::URL_SAFE_NO_PAD + .decode(token[1]) + .unwrap(), + ) + .unwrap(); let items: HashMap = serde_json::from_str(&content).unwrap(); assert!(items.contains_key("exp"));