Merge pyextra subtree

pull/689/head
Vehicle Researcher 2019-06-06 04:31:54 +00:00
commit 59bd6b8837
19 changed files with 1289 additions and 0 deletions

View File

@ -0,0 +1,70 @@
Metadata-Version: 1.1
Name: PyJWT
Version: 1.4.1
Summary: JSON Web Token implementation in Python
Home-page: http://github.com/jpadilla/pyjwt
Author: José Padilla
Author-email: hello@jpadilla.com
License: MIT
Description: # PyJWT
[![travis-status-image]][travis]
[![appveyor-status-image]][appveyor]
[![pypi-version-image]][pypi]
[![coveralls-status-image]][coveralls]
[![docs-status-image]][docs]
A Python implementation of [RFC 7519][jwt-spec].
Original implementation was written by [@progrium][progrium].
## Installing
```
$ pip install PyJWT
```
## Usage
```python
>>> import jwt
>>> encoded = jwt.encode({'some': 'payload'}, 'secret', algorithm='HS256')
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoicGF5bG9hZCJ9.4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZg'
>>> jwt.decode(encoded, 'secret', algorithms=['HS256'])
{'some': 'payload'}
```
## Tests
You can run tests from the project root after cloning with:
```
$ python setup.py test
```
[travis-status-image]: https://secure.travis-ci.org/jpadilla/pyjwt.svg?branch=master
[travis]: http://travis-ci.org/jpadilla/pyjwt?branch=master
[appveyor-status-image]: https://ci.appveyor.com/api/projects/status/h8nt70aqtwhht39t?svg=true
[appveyor]: https://ci.appveyor.com/project/jpadilla/pyjwt
[pypi-version-image]: https://img.shields.io/pypi/v/pyjwt.svg
[pypi]: https://pypi.python.org/pypi/pyjwt
[coveralls-status-image]: https://coveralls.io/repos/jpadilla/pyjwt/badge.svg?branch=master
[coveralls]: https://coveralls.io/r/jpadilla/pyjwt?branch=master
[docs-status-image]: https://readthedocs.org/projects/pyjwt/badge/?version=latest
[docs]: http://pyjwt.readthedocs.org
[jwt-spec]: https://tools.ietf.org/html/rfc7519
[progrium]: https://github.com/progrium
Keywords: jwt json web token security signing
Platform: UNKNOWN
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: Natural Language :: English
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2.6
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3.3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5
Classifier: Topic :: Utilities

View File

@ -0,0 +1,49 @@
AUTHORS
CHANGELOG.md
LICENSE
MANIFEST.in
README.md
setup.cfg
setup.py
tox.ini
PyJWT.egg-info/PKG-INFO
PyJWT.egg-info/SOURCES.txt
PyJWT.egg-info/dependency_links.txt
PyJWT.egg-info/entry_points.txt
PyJWT.egg-info/requires.txt
PyJWT.egg-info/top_level.txt
jwt/__init__.py
jwt/__main__.py
jwt/algorithms.py
jwt/api_jws.py
jwt/api_jwt.py
jwt/compat.py
jwt/exceptions.py
jwt/utils.py
jwt/contrib/__init__.py
jwt/contrib/algorithms/__init__.py
jwt/contrib/algorithms/py_ecdsa.py
jwt/contrib/algorithms/pycrypto.py
tests/__init__.py
tests/compat.py
tests/test_algorithms.py
tests/test_api_jws.py
tests/test_api_jwt.py
tests/test_compat.py
tests/test_exceptions.py
tests/test_jwt.py
tests/utils.py
tests/contrib/__init__.py
tests/contrib/test_algorithms.py
tests/keys/__init__.py
tests/keys/jwk_ec_key.json
tests/keys/jwk_ec_pub.json
tests/keys/jwk_hmac.json
tests/keys/jwk_rsa_key.json
tests/keys/jwk_rsa_pub.json
tests/keys/testkey2_rsa.pub.pem
tests/keys/testkey_ec
tests/keys/testkey_ec.pub
tests/keys/testkey_rsa
tests/keys/testkey_rsa.cer
tests/keys/testkey_rsa.pub

View File

@ -0,0 +1,3 @@
[console_scripts]
jwt = jwt.__main__:main

View File

@ -0,0 +1,31 @@
../../../../bin/jwt
../jwt/__init__.py
../jwt/__init__.pyc
../jwt/__main__.py
../jwt/__main__.pyc
../jwt/algorithms.py
../jwt/algorithms.pyc
../jwt/api_jws.py
../jwt/api_jws.pyc
../jwt/api_jwt.py
../jwt/api_jwt.pyc
../jwt/compat.py
../jwt/compat.pyc
../jwt/contrib/__init__.py
../jwt/contrib/__init__.pyc
../jwt/contrib/algorithms/__init__.py
../jwt/contrib/algorithms/__init__.pyc
../jwt/contrib/algorithms/py_ecdsa.py
../jwt/contrib/algorithms/py_ecdsa.pyc
../jwt/contrib/algorithms/pycrypto.py
../jwt/contrib/algorithms/pycrypto.pyc
../jwt/exceptions.py
../jwt/exceptions.pyc
../jwt/utils.py
../jwt/utils.pyc
PKG-INFO
SOURCES.txt
dependency_links.txt
entry_points.txt
requires.txt
top_level.txt

View File

@ -0,0 +1,13 @@
[crypto]
cryptography
[flake8]
flake8
flake8-import-order
pep8-naming
[test]
pytest==2.7.3
pytest-cov
pytest-runner

View File

@ -0,0 +1 @@
jwt

View File

@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
# flake8: noqa
"""
JSON Web Token implementation
Minimum implementation based on this spec:
http://self-issued.info/docs/draft-jones-json-web-token-01.html
"""
__title__ = 'pyjwt'
__version__ = '1.4.1'
__author__ = 'José Padilla'
__license__ = 'MIT'
__copyright__ = 'Copyright 2015 José Padilla'
from .api_jwt import (
encode, decode, register_algorithm, unregister_algorithm,
get_unverified_header, PyJWT
)
from .api_jws import PyJWS
from .exceptions import (
InvalidTokenError, DecodeError, InvalidAudienceError,
ExpiredSignatureError, ImmatureSignatureError, InvalidIssuedAtError,
InvalidIssuerError, ExpiredSignature, InvalidAudience, InvalidIssuer,
MissingRequiredClaimError
)

View File

@ -0,0 +1,135 @@
#!/usr/bin/env python
from __future__ import absolute_import, print_function
import json
import optparse
import sys
import time
from . import DecodeError, __package__, __version__, decode, encode
def main():
usage = '''Encodes or decodes JSON Web Tokens based on input.
%prog [options] input
Decoding examples:
%prog --key=secret json.web.token
%prog --no-verify json.web.token
Encoding requires the key option and takes space separated key/value pairs
separated by equals (=) as input. Examples:
%prog --key=secret iss=me exp=1302049071
%prog --key=secret foo=bar exp=+10
The exp key is special and can take an offset to current Unix time.\
'''
p = optparse.OptionParser(
usage=usage,
prog=__package__,
version='%s %s' % (__package__, __version__),
)
p.add_option(
'-n', '--no-verify',
action='store_false',
dest='verify',
default=True,
help='ignore signature and claims verification on decode'
)
p.add_option(
'--key',
dest='key',
metavar='KEY',
default=None,
help='set the secret key to sign with'
)
p.add_option(
'--alg',
dest='algorithm',
metavar='ALG',
default='HS256',
help='set crypto algorithm to sign with. default=HS256'
)
options, arguments = p.parse_args()
if len(arguments) > 0 or not sys.stdin.isatty():
if len(arguments) == 1 and (not options.verify or options.key):
# Try to decode
try:
if not sys.stdin.isatty():
token = sys.stdin.read()
else:
token = arguments[0]
token = token.encode('utf-8')
data = decode(token, key=options.key, verify=options.verify)
print(json.dumps(data))
sys.exit(0)
except DecodeError as e:
print(e)
sys.exit(1)
# Try to encode
if options.key is None:
print('Key is required when encoding. See --help for usage.')
sys.exit(1)
# Build payload object to encode
payload = {}
for arg in arguments:
try:
k, v = arg.split('=', 1)
# exp +offset special case?
if k == 'exp' and v[0] == '+' and len(v) > 1:
v = str(int(time.time()+int(v[1:])))
# Cast to integer?
if v.isdigit():
v = int(v)
else:
# Cast to float?
try:
v = float(v)
except ValueError:
pass
# Cast to true, false, or null?
constants = {'true': True, 'false': False, 'null': None}
if v in constants:
v = constants[v]
payload[k] = v
except ValueError:
print('Invalid encoding input at {}'.format(arg))
sys.exit(1)
try:
token = encode(
payload,
key=options.key,
algorithm=options.algorithm
)
print(token)
sys.exit(0)
except Exception as e:
print(e)
sys.exit(1)
else:
p.print_help()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,290 @@
import hashlib
import hmac
from .compat import constant_time_compare, string_types, text_type
from .exceptions import InvalidKeyError
from .utils import der_to_raw_signature, raw_to_der_signature
try:
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import (
load_pem_private_key, load_pem_public_key, load_ssh_public_key
)
from cryptography.hazmat.primitives.asymmetric.rsa import (
RSAPrivateKey, RSAPublicKey
)
from cryptography.hazmat.primitives.asymmetric.ec import (
EllipticCurvePrivateKey, EllipticCurvePublicKey
)
from cryptography.hazmat.primitives.asymmetric import ec, padding
from cryptography.hazmat.backends import default_backend
from cryptography.exceptions import InvalidSignature
has_crypto = True
except ImportError:
has_crypto = False
def get_default_algorithms():
"""
Returns the algorithms that are implemented by the library.
"""
default_algorithms = {
'none': NoneAlgorithm(),
'HS256': HMACAlgorithm(HMACAlgorithm.SHA256),
'HS384': HMACAlgorithm(HMACAlgorithm.SHA384),
'HS512': HMACAlgorithm(HMACAlgorithm.SHA512)
}
if has_crypto:
default_algorithms.update({
'RS256': RSAAlgorithm(RSAAlgorithm.SHA256),
'RS384': RSAAlgorithm(RSAAlgorithm.SHA384),
'RS512': RSAAlgorithm(RSAAlgorithm.SHA512),
'ES256': ECAlgorithm(ECAlgorithm.SHA256),
'ES384': ECAlgorithm(ECAlgorithm.SHA384),
'ES512': ECAlgorithm(ECAlgorithm.SHA512),
'PS256': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA256),
'PS384': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA384),
'PS512': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA512)
})
return default_algorithms
class Algorithm(object):
"""
The interface for an algorithm used to sign and verify tokens.
"""
def prepare_key(self, key):
"""
Performs necessary validation and conversions on the key and returns
the key value in the proper format for sign() and verify().
"""
raise NotImplementedError
def sign(self, msg, key):
"""
Returns a digital signature for the specified message
using the specified key value.
"""
raise NotImplementedError
def verify(self, msg, key, sig):
"""
Verifies that the specified digital signature is valid
for the specified message and key values.
"""
raise NotImplementedError
class NoneAlgorithm(Algorithm):
"""
Placeholder for use when no signing or verification
operations are required.
"""
def prepare_key(self, key):
if key == '':
key = None
if key is not None:
raise InvalidKeyError('When alg = "none", key value must be None.')
return key
def sign(self, msg, key):
return b''
def verify(self, msg, key, sig):
return False
class HMACAlgorithm(Algorithm):
"""
Performs signing and verification operations using HMAC
and the specified hash function.
"""
SHA256 = hashlib.sha256
SHA384 = hashlib.sha384
SHA512 = hashlib.sha512
def __init__(self, hash_alg):
self.hash_alg = hash_alg
def prepare_key(self, key):
if not isinstance(key, string_types) and not isinstance(key, bytes):
raise TypeError('Expecting a string- or bytes-formatted key.')
if isinstance(key, text_type):
key = key.encode('utf-8')
invalid_strings = [
b'-----BEGIN PUBLIC KEY-----',
b'-----BEGIN CERTIFICATE-----',
b'ssh-rsa'
]
if any([string_value in key for string_value in invalid_strings]):
raise InvalidKeyError(
'The specified key is an asymmetric key or x509 certificate and'
' should not be used as an HMAC secret.')
return key
def sign(self, msg, key):
return hmac.new(key, msg, self.hash_alg).digest()
def verify(self, msg, key, sig):
return constant_time_compare(sig, self.sign(msg, key))
if has_crypto:
class RSAAlgorithm(Algorithm):
"""
Performs signing and verification operations using
RSASSA-PKCS-v1_5 and the specified hash function.
"""
SHA256 = hashes.SHA256
SHA384 = hashes.SHA384
SHA512 = hashes.SHA512
def __init__(self, hash_alg):
self.hash_alg = hash_alg
def prepare_key(self, key):
if isinstance(key, RSAPrivateKey) or \
isinstance(key, RSAPublicKey):
return key
if isinstance(key, string_types):
if isinstance(key, text_type):
key = key.encode('utf-8')
try:
if key.startswith(b'ssh-rsa'):
key = load_ssh_public_key(key, backend=default_backend())
else:
key = load_pem_private_key(key, password=None, backend=default_backend())
except ValueError:
key = load_pem_public_key(key, backend=default_backend())
else:
raise TypeError('Expecting a PEM-formatted key.')
return key
def sign(self, msg, key):
signer = key.signer(
padding.PKCS1v15(),
self.hash_alg()
)
signer.update(msg)
return signer.finalize()
def verify(self, msg, key, sig):
verifier = key.verifier(
sig,
padding.PKCS1v15(),
self.hash_alg()
)
verifier.update(msg)
try:
verifier.verify()
return True
except InvalidSignature:
return False
class ECAlgorithm(Algorithm):
"""
Performs signing and verification operations using
ECDSA and the specified hash function
"""
SHA256 = hashes.SHA256
SHA384 = hashes.SHA384
SHA512 = hashes.SHA512
def __init__(self, hash_alg):
self.hash_alg = hash_alg
def prepare_key(self, key):
if isinstance(key, EllipticCurvePrivateKey) or \
isinstance(key, EllipticCurvePublicKey):
return key
if isinstance(key, string_types):
if isinstance(key, text_type):
key = key.encode('utf-8')
# Attempt to load key. We don't know if it's
# a Signing Key or a Verifying Key, so we try
# the Verifying Key first.
try:
key = load_pem_public_key(key, backend=default_backend())
except ValueError:
key = load_pem_private_key(key, password=None, backend=default_backend())
else:
raise TypeError('Expecting a PEM-formatted key.')
return key
def sign(self, msg, key):
signer = key.signer(ec.ECDSA(self.hash_alg()))
signer.update(msg)
der_sig = signer.finalize()
return der_to_raw_signature(der_sig, key.curve)
def verify(self, msg, key, sig):
try:
der_sig = raw_to_der_signature(sig, key.curve)
except ValueError:
return False
verifier = key.verifier(der_sig, ec.ECDSA(self.hash_alg()))
verifier.update(msg)
try:
verifier.verify()
return True
except InvalidSignature:
return False
class RSAPSSAlgorithm(RSAAlgorithm):
"""
Performs a signature using RSASSA-PSS with MGF1
"""
def sign(self, msg, key):
signer = key.signer(
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
signer.update(msg)
return signer.finalize()
def verify(self, msg, key, sig):
verifier = key.verifier(
sig,
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
verifier.update(msg)
try:
verifier.verify()
return True
except InvalidSignature:
return False

View File

@ -0,0 +1,204 @@
import binascii
import json
import warnings
from collections import Mapping
from .algorithms import Algorithm, get_default_algorithms # NOQA
from .compat import binary_type, string_types, text_type
from .exceptions import DecodeError, InvalidAlgorithmError, InvalidTokenError
from .utils import base64url_decode, base64url_encode, merge_dict
class PyJWS(object):
header_typ = 'JWT'
def __init__(self, algorithms=None, options=None):
self._algorithms = get_default_algorithms()
self._valid_algs = (set(algorithms) if algorithms is not None
else set(self._algorithms))
# Remove algorithms that aren't on the whitelist
for key in list(self._algorithms.keys()):
if key not in self._valid_algs:
del self._algorithms[key]
if not options:
options = {}
self.options = merge_dict(self._get_default_options(), options)
@staticmethod
def _get_default_options():
return {
'verify_signature': True
}
def register_algorithm(self, alg_id, alg_obj):
"""
Registers a new Algorithm for use when creating and verifying tokens.
"""
if alg_id in self._algorithms:
raise ValueError('Algorithm already has a handler.')
if not isinstance(alg_obj, Algorithm):
raise TypeError('Object is not of type `Algorithm`')
self._algorithms[alg_id] = alg_obj
self._valid_algs.add(alg_id)
def unregister_algorithm(self, alg_id):
"""
Unregisters an Algorithm for use when creating and verifying tokens
Throws KeyError if algorithm is not registered.
"""
if alg_id not in self._algorithms:
raise KeyError('The specified algorithm could not be removed'
' because it is not registered.')
del self._algorithms[alg_id]
self._valid_algs.remove(alg_id)
def get_algorithms(self):
"""
Returns a list of supported values for the 'alg' parameter.
"""
return list(self._valid_algs)
def encode(self, payload, key, algorithm='HS256', headers=None,
json_encoder=None):
segments = []
if algorithm is None:
algorithm = 'none'
if algorithm not in self._valid_algs:
pass
# Header
header = {'typ': self.header_typ, 'alg': algorithm}
if headers:
self._validate_headers(headers)
header.update(headers)
json_header = json.dumps(
header,
separators=(',', ':'),
cls=json_encoder
).encode('utf-8')
segments.append(base64url_encode(json_header))
segments.append(base64url_encode(payload))
# Segments
signing_input = b'.'.join(segments)
try:
alg_obj = self._algorithms[algorithm]
key = alg_obj.prepare_key(key)
signature = alg_obj.sign(signing_input, key)
except KeyError:
raise NotImplementedError('Algorithm not supported')
segments.append(base64url_encode(signature))
return b'.'.join(segments)
def decode(self, jws, key='', verify=True, algorithms=None, options=None,
**kwargs):
payload, signing_input, header, signature = self._load(jws)
if verify:
merged_options = merge_dict(self.options, options)
if merged_options.get('verify_signature'):
self._verify_signature(payload, signing_input, header, signature,
key, algorithms)
else:
warnings.warn('The verify parameter is deprecated. '
'Please use options instead.', DeprecationWarning)
return payload
def get_unverified_header(self, jwt):
"""Returns back the JWT header parameters as a dict()
Note: The signature is not verified so the header parameters
should not be fully trusted until signature verification is complete
"""
headers = self._load(jwt)[2]
self._validate_headers(headers)
return headers
def _load(self, jwt):
if isinstance(jwt, text_type):
jwt = jwt.encode('utf-8')
if not issubclass(type(jwt), binary_type):
raise DecodeError("Invalid token type. Token must be a {0}".format(
binary_type))
try:
signing_input, crypto_segment = jwt.rsplit(b'.', 1)
header_segment, payload_segment = signing_input.split(b'.', 1)
except ValueError:
raise DecodeError('Not enough segments')
try:
header_data = base64url_decode(header_segment)
except (TypeError, binascii.Error):
raise DecodeError('Invalid header padding')
try:
header = json.loads(header_data.decode('utf-8'))
except ValueError as e:
raise DecodeError('Invalid header string: %s' % e)
if not isinstance(header, Mapping):
raise DecodeError('Invalid header string: must be a json object')
try:
payload = base64url_decode(payload_segment)
except (TypeError, binascii.Error):
raise DecodeError('Invalid payload padding')
try:
signature = base64url_decode(crypto_segment)
except (TypeError, binascii.Error):
raise DecodeError('Invalid crypto padding')
return (payload, signing_input, header, signature)
def _verify_signature(self, payload, signing_input, header, signature,
key='', algorithms=None):
alg = header.get('alg')
if algorithms is not None and alg not in algorithms:
raise InvalidAlgorithmError('The specified alg value is not allowed')
try:
alg_obj = self._algorithms[alg]
key = alg_obj.prepare_key(key)
if not alg_obj.verify(signing_input, key, signature):
raise DecodeError('Signature verification failed')
except KeyError:
raise InvalidAlgorithmError('Algorithm not supported')
def _validate_headers(self, headers):
if 'kid' in headers:
self._validate_kid(headers['kid'])
def _validate_kid(self, kid):
if not isinstance(kid, string_types):
raise InvalidTokenError('Key ID header parameter must be a string')
_jws_global_obj = PyJWS()
encode = _jws_global_obj.encode
decode = _jws_global_obj.decode
register_algorithm = _jws_global_obj.register_algorithm
unregister_algorithm = _jws_global_obj.unregister_algorithm
get_unverified_header = _jws_global_obj.get_unverified_header

View File

@ -0,0 +1,187 @@
import json
import warnings
from calendar import timegm
from collections import Mapping
from datetime import datetime, timedelta
from .api_jws import PyJWS
from .algorithms import Algorithm, get_default_algorithms # NOQA
from .compat import string_types, timedelta_total_seconds
from .exceptions import (
DecodeError, ExpiredSignatureError, ImmatureSignatureError,
InvalidAudienceError, InvalidIssuedAtError,
InvalidIssuerError, MissingRequiredClaimError
)
from .utils import merge_dict
class PyJWT(PyJWS):
header_type = 'JWT'
@staticmethod
def _get_default_options():
return {
'verify_signature': True,
'verify_exp': True,
'verify_nbf': True,
'verify_iat': True,
'verify_aud': True,
'verify_iss': True,
'require_exp': False,
'require_iat': False,
'require_nbf': False
}
def encode(self, payload, key, algorithm='HS256', headers=None,
json_encoder=None):
# Check that we get a mapping
if not isinstance(payload, Mapping):
raise TypeError('Expecting a mapping object, as JWT only supports '
'JSON objects as payloads.')
# Payload
for time_claim in ['exp', 'iat', 'nbf']:
# Convert datetime to a intDate value in known time-format claims
if isinstance(payload.get(time_claim), datetime):
payload[time_claim] = timegm(payload[time_claim].utctimetuple())
json_payload = json.dumps(
payload,
separators=(',', ':'),
cls=json_encoder
).encode('utf-8')
return super(PyJWT, self).encode(
json_payload, key, algorithm, headers, json_encoder
)
def decode(self, jwt, key='', verify=True, algorithms=None, options=None,
**kwargs):
payload, signing_input, header, signature = self._load(jwt)
decoded = super(PyJWT, self).decode(jwt, key, verify, algorithms,
options, **kwargs)
try:
payload = json.loads(decoded.decode('utf-8'))
except ValueError as e:
raise DecodeError('Invalid payload string: %s' % e)
if not isinstance(payload, Mapping):
raise DecodeError('Invalid payload string: must be a json object')
if verify:
merged_options = merge_dict(self.options, options)
self._validate_claims(payload, merged_options, **kwargs)
return payload
def _validate_claims(self, payload, options, audience=None, issuer=None,
leeway=0, **kwargs):
if 'verify_expiration' in kwargs:
options['verify_exp'] = kwargs.get('verify_expiration', True)
warnings.warn('The verify_expiration parameter is deprecated. '
'Please use options instead.', DeprecationWarning)
if isinstance(leeway, timedelta):
leeway = timedelta_total_seconds(leeway)
if not isinstance(audience, (string_types, type(None))):
raise TypeError('audience must be a string or None')
self._validate_required_claims(payload, options)
now = timegm(datetime.utcnow().utctimetuple())
if 'iat' in payload and options.get('verify_iat'):
self._validate_iat(payload, now, leeway)
if 'nbf' in payload and options.get('verify_nbf'):
self._validate_nbf(payload, now, leeway)
if 'exp' in payload and options.get('verify_exp'):
self._validate_exp(payload, now, leeway)
if options.get('verify_iss'):
self._validate_iss(payload, issuer)
if options.get('verify_aud'):
self._validate_aud(payload, audience)
def _validate_required_claims(self, payload, options):
if options.get('require_exp') and payload.get('exp') is None:
raise MissingRequiredClaimError('exp')
if options.get('require_iat') and payload.get('iat') is None:
raise MissingRequiredClaimError('iat')
if options.get('require_nbf') and payload.get('nbf') is None:
raise MissingRequiredClaimError('nbf')
def _validate_iat(self, payload, now, leeway):
try:
iat = int(payload['iat'])
except ValueError:
raise DecodeError('Issued At claim (iat) must be an integer.')
if iat > (now + leeway):
raise InvalidIssuedAtError('Issued At claim (iat) cannot be in'
' the future.')
def _validate_nbf(self, payload, now, leeway):
try:
nbf = int(payload['nbf'])
except ValueError:
raise DecodeError('Not Before claim (nbf) must be an integer.')
if nbf > (now + leeway):
raise ImmatureSignatureError('The token is not yet valid (nbf)')
def _validate_exp(self, payload, now, leeway):
try:
exp = int(payload['exp'])
except ValueError:
raise DecodeError('Expiration Time claim (exp) must be an'
' integer.')
if exp < (now - leeway):
raise ExpiredSignatureError('Signature has expired')
def _validate_aud(self, payload, audience):
if audience is None and 'aud' not in payload:
return
if audience is not None and 'aud' not in payload:
# Application specified an audience, but it could not be
# verified since the token does not contain a claim.
raise MissingRequiredClaimError('aud')
audience_claims = payload['aud']
if isinstance(audience_claims, string_types):
audience_claims = [audience_claims]
if not isinstance(audience_claims, list):
raise InvalidAudienceError('Invalid claim format in token')
if any(not isinstance(c, string_types) for c in audience_claims):
raise InvalidAudienceError('Invalid claim format in token')
if audience not in audience_claims:
raise InvalidAudienceError('Invalid audience')
def _validate_iss(self, payload, issuer):
if issuer is None:
return
if 'iss' not in payload:
raise MissingRequiredClaimError('iss')
if payload['iss'] != issuer:
raise InvalidIssuerError('Invalid issuer')
_jwt_global_obj = PyJWT()
encode = _jwt_global_obj.encode
decode = _jwt_global_obj.decode
register_algorithm = _jwt_global_obj.register_algorithm
unregister_algorithm = _jwt_global_obj.unregister_algorithm
get_unverified_header = _jwt_global_obj.get_unverified_header

View File

@ -0,0 +1,54 @@
"""
The `compat` module provides support for backwards compatibility with older
versions of python, and compatibility wrappers around optional packages.
"""
# flake8: noqa
import sys
import hmac
PY3 = sys.version_info[0] == 3
if PY3:
string_types = str,
text_type = str
binary_type = bytes
else:
string_types = basestring,
text_type = unicode
binary_type = str
def timedelta_total_seconds(delta):
try:
delta.total_seconds
except AttributeError:
# On Python 2.6, timedelta instances do not have
# a .total_seconds() method.
total_seconds = delta.days * 24 * 60 * 60 + delta.seconds
else:
total_seconds = delta.total_seconds()
return total_seconds
try:
constant_time_compare = hmac.compare_digest
except AttributeError:
# Fallback for Python < 2.7
def constant_time_compare(val1, val2):
"""
Returns True if the two strings are equal, False otherwise.
The time taken is independent of the number of characters that match.
"""
if len(val1) != len(val2):
return False
result = 0
for x, y in zip(val1, val2):
result |= ord(x) ^ ord(y)
return result == 0

View File

View File

@ -0,0 +1,60 @@
# Note: This file is named py_ecdsa.py because import behavior in Python 2
# would cause ecdsa.py to squash the ecdsa library that it depends upon.
import hashlib
import ecdsa
from jwt.algorithms import Algorithm
from jwt.compat import string_types, text_type
class ECAlgorithm(Algorithm):
"""
Performs signing and verification operations using
ECDSA and the specified hash function
This class requires the ecdsa package to be installed.
This is based off of the implementation in PyJWT 0.3.2
"""
SHA256 = hashlib.sha256
SHA384 = hashlib.sha384
SHA512 = hashlib.sha512
def __init__(self, hash_alg):
self.hash_alg = hash_alg
def prepare_key(self, key):
if isinstance(key, ecdsa.SigningKey) or \
isinstance(key, ecdsa.VerifyingKey):
return key
if isinstance(key, string_types):
if isinstance(key, text_type):
key = key.encode('utf-8')
# Attempt to load key. We don't know if it's
# a Signing Key or a Verifying Key, so we try
# the Verifying Key first.
try:
key = ecdsa.VerifyingKey.from_pem(key)
except ecdsa.der.UnexpectedDER:
key = ecdsa.SigningKey.from_pem(key)
else:
raise TypeError('Expecting a PEM-formatted key.')
return key
def sign(self, msg, key):
return key.sign(msg, hashfunc=self.hash_alg,
sigencode=ecdsa.util.sigencode_string)
def verify(self, msg, key, sig):
try:
return key.verify(sig, msg, hashfunc=self.hash_alg,
sigdecode=ecdsa.util.sigdecode_string)
except AssertionError:
return False

View File

@ -0,0 +1,47 @@
import Crypto.Hash.SHA256
import Crypto.Hash.SHA384
import Crypto.Hash.SHA512
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from jwt.algorithms import Algorithm
from jwt.compat import string_types, text_type
class RSAAlgorithm(Algorithm):
"""
Performs signing and verification operations using
RSASSA-PKCS-v1_5 and the specified hash function.
This class requires PyCrypto package to be installed.
This is based off of the implementation in PyJWT 0.3.2
"""
SHA256 = Crypto.Hash.SHA256
SHA384 = Crypto.Hash.SHA384
SHA512 = Crypto.Hash.SHA512
def __init__(self, hash_alg):
self.hash_alg = hash_alg
def prepare_key(self, key):
if isinstance(key, RSA._RSAobj):
return key
if isinstance(key, string_types):
if isinstance(key, text_type):
key = key.encode('utf-8')
key = RSA.importKey(key)
else:
raise TypeError('Expecting a PEM- or RSA-formatted key.')
return key
def sign(self, msg, key):
return PKCS1_v1_5.new(key).sign(self.hash_alg.new(msg))
def verify(self, msg, key, sig):
return PKCS1_v1_5.new(key).verify(self.hash_alg.new(msg), sig)

View File

@ -0,0 +1,48 @@
class InvalidTokenError(Exception):
pass
class DecodeError(InvalidTokenError):
pass
class ExpiredSignatureError(InvalidTokenError):
pass
class InvalidAudienceError(InvalidTokenError):
pass
class InvalidIssuerError(InvalidTokenError):
pass
class InvalidIssuedAtError(InvalidTokenError):
pass
class ImmatureSignatureError(InvalidTokenError):
pass
class InvalidKeyError(Exception):
pass
class InvalidAlgorithmError(InvalidTokenError):
pass
class MissingRequiredClaimError(InvalidTokenError):
def __init__(self, claim):
self.claim = claim
def __str__(self):
return 'Token is missing the "%s" claim' % self.claim
# Compatibility aliases (deprecated)
ExpiredSignature = ExpiredSignatureError
InvalidAudience = InvalidAudienceError
InvalidIssuer = InvalidIssuerError

View File

@ -0,0 +1,67 @@
import base64
import binascii
try:
from cryptography.hazmat.primitives.asymmetric.utils import (
decode_rfc6979_signature, encode_rfc6979_signature
)
except ImportError:
pass
def base64url_decode(input):
rem = len(input) % 4
if rem > 0:
input += b'=' * (4 - rem)
return base64.urlsafe_b64decode(input)
def base64url_encode(input):
return base64.urlsafe_b64encode(input).replace(b'=', b'')
def merge_dict(original, updates):
if not updates:
return original
try:
merged_options = original.copy()
merged_options.update(updates)
except (AttributeError, ValueError) as e:
raise TypeError('original and updates must be a dictionary: %s' % e)
return merged_options
def number_to_bytes(num, num_bytes):
padded_hex = '%0*x' % (2 * num_bytes, num)
big_endian = binascii.a2b_hex(padded_hex.encode('ascii'))
return big_endian
def bytes_to_number(string):
return int(binascii.b2a_hex(string), 16)
def der_to_raw_signature(der_sig, curve):
num_bits = curve.key_size
num_bytes = (num_bits + 7) // 8
r, s = decode_rfc6979_signature(der_sig)
return number_to_bytes(r, num_bytes) + number_to_bytes(s, num_bytes)
def raw_to_der_signature(raw_sig, curve):
num_bits = curve.key_size
num_bytes = (num_bits + 7) // 8
if len(raw_sig) != 2 * num_bytes:
raise ValueError('Invalid signature')
r = bytes_to_number(raw_sig[:num_bytes])
s = bytes_to_number(raw_sig[num_bytes:])
return encode_rfc6979_signature(r, s)