Source code for mindsphere_core.commonutil

import jwt
import re
from datetime import datetime, timedelta
import os

from . import error_messages
from . import constants


[docs]class APIErrorResponse(Exception): def __init__( self, status_code=None, message=None, log_ref=None, time_stamp=None, error=None, exception=None, path=None, ): self.status_code = status_code self.message = message self.log_ref = log_ref self.time_stamp = time_stamp self.error = error self.exception = exception self.path = path def __str__(self): return error_messages.API_ERROR_RESPONSE.format( self.status_code, self.message, self.log_ref, self.time_stamp, self.error, self.exception, self.path, )
[docs]class APIError(Exception): def __init__(self, status_code, message): self.status_code = status_code self.message = message def __str__(self): return error_messages.API_ERROR.format(self.status_code, self.message)
[docs]class UnAuthorisedErrorResponse(Exception): def __init__(self, stack_trace=None, message=None, log_ref=None): self.stack_trace = stack_trace self.message = message self.log_ref = log_ref def __str__(self): return error_messages.UNAUTHORISED_ERROR_RESPONSE.format( self.stack_trace, self.message, self.log_ref )
[docs]def is_token_valid(token, is_technical_token): payload = _decode_jwt(token) if payload is None: return False elif ( _validate_claim(payload) and _validate_issuer(payload) and _is_token_active(payload.get("exp"), is_technical_token) ): return True else: return _check_for_mock_token(token)
[docs]def _validate_issuer(payload): if ( payload is not None and payload.get("iss") is not None and _is_issuer_valid(payload) ): return True else: return False
[docs]def _is_token_valid_without_issuer_validation(token): payload = _decode_jwt(token) if payload is None: return is_token_valid elif _validate_claim(payload): return True else: return False
[docs]def _validate_claim(payload): token_type = constants.TOKEN_TYPE if ( payload is not None and payload.get("exp") is not None and payload.get("aud") is not None and payload.get("iss") is not None and payload.get("iat") is not None and token_type == jwt.api_jwt.PyJWT.header_type ): return True else: return False
[docs]def _decode_jwt(token): token = _check_for_bearer(token) try: payload = jwt.decode(token, "secret", verify=False, algorithms=["RS256"]) return payload except jwt.PyJWTError: return None
[docs]def _is_token_active(token_expiry_time, is_technical_token): token_expiry_time = datetime.fromtimestamp(token_expiry_time) token_fetching_interval = 0 if is_technical_token: token_fetching_interval = 5 datetime.utcnow() after_fetching_interval = datetime.now() + timedelta( minutes=token_fetching_interval ) if token_expiry_time is not None: return token_expiry_time > after_fetching_interval else: return False
[docs]def _check_for_bearer(token): token_prefix = "bearer" if ( token is not None and len(token) > 7 and token[0:6].lower() == token_prefix.lower() ): token_last_index = token.rfind(".") + 1 token = token[7:token_last_index] return token
[docs]def _check_for_mock_token(token): mock_value_string = "MOCK" if token is not None and mock_value_string == token: return True else: return False
[docs]def _is_issuer_valid(payload): if re.match("https://.+.piam..+.mindsphere.io.*", payload.get("iss")): return True else: return False
[docs]def extract_env_var(var_name): var_value = None if var_name in os.environ: var_value = os.environ[var_name] return var_value
[docs]def append_bearer(token): token_prefix = "bearer" if ( token is not None and len(token) > 7 and token_prefix.lower() in token[0:6].lower() ): token = token[7:] return token
[docs]def get_tenant_from_token(authorization): payload = _decode_jwt(authorization) if ( payload is not None and payload.get(constants.ZID) is not None and payload.get(constants.TEN) is not None ): return payload.get(constants.ZID), payload.get(constants.TEN) else: return None
[docs]def parse_exception(response): if response.text: error = response.json() else: return APIErrorResponse(status_code=response.status_code) try: if error: if ( (constants.ERRORS in error) and (type(error[constants.ERRORS]) is list) and error[constants.ERRORS][0] ): return APIErrorResponse( error[constants.ERRORS][0].get(constants.STATUS), error[constants.ERRORS][0].get(constants.MESSAGE), error[constants.ERRORS][0].get(constants.LOG_REF), error[constants.ERRORS][0].get(constants.TIMESTAMP), error[constants.ERRORS][0].get(constants.ERROR), error[constants.ERRORS][0].get(constants.EXCEPTION), error[constants.ERRORS][0].get(constants.PATH), error[constants.ERRORS][0].get(constants.ERROR_CODE), ) elif (constants.ERROR in error) and error[constants.ERROR]: return APIErrorResponse( error=error[constants.ERROR], message=error[constants.ERROR_DESCRIPTION], ) elif (type(error) is list) and error[0] is not None: return APIErrorResponse( error[0].get(constants.STATUS), error[0].get(constants.MESSAGE), error[0].get(constants.LOG_REF), error[0].get(constants.TIMESTAMP), error[0].get(constants.ERROR), error.get(constants.EXCEPTION), error[0].get(constants.PATH), ) elif (constants.ERROR_CODE in error) and error[constants.ERROR_CODE]: return APIErrorResponse( error=error[constants.ERROR_CODE], message=error[constants.MESSAGE], ) else: return APIErrorResponse( error=error ) except Exception: try: if error is not None and error[0] is not None: return UnAuthorisedErrorResponse( error[0].get(constants.STACKTRACE), error[0].get(constants.MESSAGE), error[0].get(constants.LOG_REF), ) except Exception: if ( error is not None and error_messages.TEMPORARY_SERVICE_UNAVIALABLE in error.values() ): return APIError( status_code=503, message=error_messages.TEMPORARY_SERVICE_UNAVIALABLE, ) else: return APIError(status_code=500, message=error)