import jwt
import re
from datetime import datetime, timedelta
import os
from . import error_messages
from . import constants
[docs]class APIErrorResponse(Exception):
def __init__(
self,
status_code=None,
message=None,
log_ref=None,
time_stamp=None,
error=None,
exception=None,
path=None,
):
self.status_code = status_code
self.message = message
self.log_ref = log_ref
self.time_stamp = time_stamp
self.error = error
self.exception = exception
self.path = path
def __str__(self):
return error_messages.API_ERROR_RESPONSE.format(
self.status_code,
self.message,
self.log_ref,
self.time_stamp,
self.error,
self.exception,
self.path,
)
[docs]class APIError(Exception):
def __init__(self, status_code, message):
self.status_code = status_code
self.message = message
def __str__(self):
return error_messages.API_ERROR.format(self.status_code, self.message)
[docs]class UnAuthorisedErrorResponse(Exception):
def __init__(self, stack_trace=None, message=None, log_ref=None):
self.stack_trace = stack_trace
self.message = message
self.log_ref = log_ref
def __str__(self):
return error_messages.UNAUTHORISED_ERROR_RESPONSE.format(
self.stack_trace, self.message, self.log_ref
)
[docs]def is_token_valid(token, is_technical_token):
payload = _decode_jwt(token)
if payload is None:
return False
elif (
_validate_claim(payload)
and _validate_issuer(payload)
and _is_token_active(payload.get("exp"), is_technical_token)
):
return True
else:
return _check_for_mock_token(token)
[docs]def _validate_issuer(payload):
if (
payload is not None
and payload.get("iss") is not None
and _is_issuer_valid(payload)
):
return True
else:
return False
[docs]def _is_token_valid_without_issuer_validation(token):
payload = _decode_jwt(token)
if payload is None:
return is_token_valid
elif _validate_claim(payload):
return True
else:
return False
[docs]def _validate_claim(payload):
token_type = constants.TOKEN_TYPE
if (
payload is not None and
payload.get("exp") is not None and
payload.get("aud") is not None and
payload.get("iss") is not None and
payload.get("iat") is not None and
token_type == jwt.api_jwt.PyJWT.header_type
):
return True
else:
return False
[docs]def _decode_jwt(token):
token = _check_for_bearer(token)
try:
payload = jwt.decode(token, "secret", verify=False, algorithms=["RS256"])
return payload
except jwt.PyJWTError:
return None
[docs]def _is_token_active(token_expiry_time, is_technical_token):
token_expiry_time = datetime.fromtimestamp(token_expiry_time)
token_fetching_interval = 0
if is_technical_token:
token_fetching_interval = 5
datetime.utcnow()
after_fetching_interval = datetime.now() + timedelta(
minutes=token_fetching_interval
)
if token_expiry_time is not None:
return token_expiry_time > after_fetching_interval
else:
return False
[docs]def _check_for_bearer(token):
token_prefix = "bearer"
if (
token is not None
and len(token) > 7
and token[0:6].lower() == token_prefix.lower()
):
token_last_index = token.rfind(".") + 1
token = token[7:token_last_index]
return token
[docs]def _check_for_mock_token(token):
mock_value_string = "MOCK"
if token is not None and mock_value_string == token:
return True
else:
return False
[docs]def _is_issuer_valid(payload):
if re.match("https://.+.piam..+.mindsphere.io.*", payload.get("iss")):
return True
else:
return False
[docs]def append_bearer(token):
token_prefix = "bearer"
if (
token is not None
and len(token) > 7
and token_prefix.lower() in token[0:6].lower()
):
token = token[7:]
return token
[docs]def get_tenant_from_token(authorization):
payload = _decode_jwt(authorization)
if (
payload is not None and
payload.get(constants.ZID) is not None and
payload.get(constants.TEN) is not None
):
return payload.get(constants.ZID), payload.get(constants.TEN)
else:
return None
[docs]def parse_exception(response):
if response.text:
error = response.json()
else:
return APIErrorResponse(status_code=response.status_code)
try:
if error:
if (
(constants.ERRORS in error)
and (type(error[constants.ERRORS]) is list)
and error[constants.ERRORS][0]
):
return APIErrorResponse(
error[constants.ERRORS][0].get(constants.STATUS),
error[constants.ERRORS][0].get(constants.MESSAGE),
error[constants.ERRORS][0].get(constants.LOG_REF),
error[constants.ERRORS][0].get(constants.TIMESTAMP),
error[constants.ERRORS][0].get(constants.ERROR),
error[constants.ERRORS][0].get(constants.EXCEPTION),
error[constants.ERRORS][0].get(constants.PATH),
error[constants.ERRORS][0].get(constants.ERROR_CODE),
)
elif (constants.ERROR in error) and error[constants.ERROR]:
return APIErrorResponse(
error=error[constants.ERROR],
message=error[constants.ERROR_DESCRIPTION],
)
elif (type(error) is list) and error[0] is not None:
return APIErrorResponse(
error[0].get(constants.STATUS),
error[0].get(constants.MESSAGE),
error[0].get(constants.LOG_REF),
error[0].get(constants.TIMESTAMP),
error[0].get(constants.ERROR),
error.get(constants.EXCEPTION),
error[0].get(constants.PATH),
)
elif (constants.ERROR_CODE in error) and error[constants.ERROR_CODE]:
return APIErrorResponse(
error=error[constants.ERROR_CODE],
message=error[constants.MESSAGE],
)
else:
return APIErrorResponse(
error=error
)
except Exception:
try:
if error is not None and error[0] is not None:
return UnAuthorisedErrorResponse(
error[0].get(constants.STACKTRACE),
error[0].get(constants.MESSAGE),
error[0].get(constants.LOG_REF),
)
except Exception:
if (
error is not None
and error_messages.TEMPORARY_SERVICE_UNAVIALABLE in error.values()
):
return APIError(
status_code=503,
message=error_messages.TEMPORARY_SERVICE_UNAVIALABLE,
)
else:
return APIError(status_code=500, message=error)