Source code for mindsphere_core.token_service

import base64
from configparser import ConfigParser, ExtendedInterpolation
import os
import os.path

from .commonutil import is_token_valid, extract_env_var, get_tenant_from_token
from mindsphere_core import mindsphere_core
from mindsphere_core import identification

from . import mindsphere_credentials
from .mindsphere_credentials import AppCredentials
from .mindsphere_credentials import TenantCredentials

from . import constants
from . import exceptions
from . import error_messages


path = os.path.abspath(__file__)
config_file = os.path.dirname(path) + r"/url_setup.ini"


[docs]class AppCredsRequestBody: def __init__(self, app_name, app_version, host_tenant, user_tenant): self.grant_type = 'client_credentials' self.appName = app_name self.appVersion = app_version self.hostTenant = host_tenant self.userTenant = user_tenant
[docs]def init_credentials(credentials): """initializes the credentials after checking the env variables if its not set by user. """ if credentials: return credentials elif not _is_app_creds_present(): return AppCredentials() elif _is_tenant_creds_present(): return TenantCredentials()
[docs]def fetch_token(rest_client_config, credentials): """Fetches a token either using service credentials or validates the token if its already provided in credentials """ if credentials and hasattr(credentials, 'cached_token') and credentials.cached_token and is_token_valid(credentials.cached_token, True): return credentials.cached_token elif credentials: return _get_requested_token(rest_client_config, credentials) elif _is_app_creds_present(): return _get_token_with_app_creds(rest_client_config, credentials) elif _is_tenant_creds_present(): return _get_token_with_tenant_creds(rest_client_config, credentials) else: raise exceptions.MindsphereForbiddenAccessError( message=error_messages.AUTHORIZATION_FAILED, http_status=401 )
[docs]def _get_requested_token(rest_client_config, credentials): if type(credentials) is mindsphere_credentials.AppCredentials: return _get_token_with_app_creds(rest_client_config, credentials) elif type(credentials) is mindsphere_credentials.TenantCredentials: return _get_token_with_tenant_creds(rest_client_config, credentials) elif type(credentials) is mindsphere_credentials.UserToken: return _get_token_with_user_token(credentials) else: raise exceptions.MindsphereForbiddenAccessError( message=error_messages.INCORRECT_CREDENTIAL, http_status=401 )
[docs]def _get_token_with_app_creds(rest_client_config, credentials): if credentials is None: credentials = AppCredentials() body_params, key_store_client_id, key_store_client_secret = _populate_app_creds(credentials) # Base 64 encoding of service credentials authorization = base64.b64encode((key_store_client_id + ":" + key_store_client_secret).encode()) authorization = authorization.decode() headers = {'Accept': 'application/hal+json', 'Content-Type': 'application/json', 'X-SPACE-AUTH-KEY': 'Basic '+str(authorization)} base_path = '/api/technicaltokenmanager/v3/oauth/token' api_url = mindsphere_core.build_url(base_path, '', rest_client_config) access_token = mindsphere_core.invoke_service(rest_client_config=rest_client_config, api_url=api_url, headers=headers, http_method='POST', body_params=body_params)[ constants.ACCESS_TOKEN ] credentials.cached_token = access_token return access_token
[docs]def _get_token_with_tenant_creds(rest_client_config, credentials): if credentials is None: credentials = TenantCredentials() client_id, client_secret, tenant, sub_tenant = _populate_tenant_creds(credentials) access_token = _invoke_token_endpoint( rest_client_config, client_id, client_secret, tenant, sub_tenant, ) credentials.cached_token = access_token return access_token
[docs]def _get_token_with_user_token(credentials): is_valid = is_token_valid(credentials.authorization, False) if not is_valid: raise exceptions.MindsphereForbiddenAccessError( message=error_messages.AUTHORIZATION_INACTIVE, http_status=403 ) else: return credentials.authorization
[docs]def _is_app_creds_present(): if extract_env_var(constants.MDSP_OS_VM_APP_NAME) and \ extract_env_var(constants.MDSP_OS_VM_APP_VERSION) and \ extract_env_var(constants.MDSP_KEY_STORE_CLIENT_ID) and \ extract_env_var(constants.MDSP_KEY_STORE_CLIENT_SECRET) and \ extract_env_var(constants.MDSP_HOST_TENANT) and \ extract_env_var(constants.MDSP_USER_TENANT): return True else: return False
[docs]def _is_tenant_creds_present(): if extract_env_var(constants.MINDSPHERE_CLIENT_ID) and \ extract_env_var(constants.MINDSPHERE_CLIENT_SECRET) and \ extract_env_var(constants.MINDSPHERE_TENANT): return True else: return False
[docs]def _populate_app_creds(credentials): if credentials.app_name: app_name = credentials.app_name elif extract_env_var(constants.MDSP_OS_VM_APP_NAME): app_name = extract_env_var(constants.MDSP_OS_VM_APP_NAME) else: raise exceptions.MindsphereClientConfigurationError( message=error_messages.UNAVAILABLE_APP_CREDS.format(constants.MDSP_OS_VM_APP_NAME) ) if credentials.app_version: app_version = credentials.app_version elif extract_env_var(constants.MDSP_OS_VM_APP_VERSION): app_version = extract_env_var(constants.MDSP_OS_VM_APP_VERSION) else: raise exceptions.MindsphereClientConfigurationError( message=error_messages.UNAVAILABLE_APP_CREDS.format(constants.MDSP_OS_VM_APP_VERSION) ) if credentials.key_store_client_id: key_store_client_id = credentials.key_store_client_id elif extract_env_var(constants.MDSP_KEY_STORE_CLIENT_ID): key_store_client_id = extract_env_var(constants.MDSP_KEY_STORE_CLIENT_ID) else: raise exceptions.MindsphereClientConfigurationError( message=error_messages.UNAVAILABLE_APP_CREDS.format(constants.MDSP_KEY_STORE_CLIENT_ID) ) if credentials.key_store_client_secret: key_store_client_secret = credentials.key_store_client_secret elif extract_env_var(constants.MDSP_KEY_STORE_CLIENT_SECRET): key_store_client_secret = extract_env_var(constants.MDSP_KEY_STORE_CLIENT_SECRET) else: raise exceptions.MindsphereClientConfigurationError( message=error_messages.UNAVAILABLE_APP_CREDS.format(constants.MDSP_KEY_STORE_CLIENT_SECRET) ) host_tenant_from_token = user_tenant_from_token = None if credentials.authorization: host_tenant_from_token, user_tenant_from_token = get_tenant_from_token(credentials.authorization) if host_tenant_from_token: host_tenant = host_tenant_from_token elif credentials.host_tenant: host_tenant = credentials.host_tenant elif extract_env_var(constants.MDSP_HOST_TENANT): host_tenant = extract_env_var(constants.MDSP_HOST_TENANT) else: raise exceptions.MindsphereClientConfigurationError( message=error_messages.UNAVAILABLE_APP_CREDS.format(constants.MDSP_HOST_TENANT) ) if user_tenant_from_token: user_tenant = user_tenant_from_token elif credentials.user_tenant: user_tenant = credentials.user_tenant elif extract_env_var(constants.MDSP_USER_TENANT): user_tenant = extract_env_var(constants.MDSP_USER_TENANT) else: raise exceptions.MindsphereClientConfigurationError( message=error_messages.UNAVAILABLE_APP_CREDS.format(constants.MDSP_USER_TENANT) ) return identification.Identification(app_name, app_version, host_tenant, user_tenant), key_store_client_id, key_store_client_secret
[docs]def _populate_tenant_creds(credentials): if credentials.client_id: client_id = credentials.client_id elif extract_env_var(constants.MINDSPHERE_CLIENT_ID): client_id = extract_env_var(constants.MINDSPHERE_CLIENT_ID) else: raise exceptions.MindsphereClientConfigurationError( message=error_messages.UNAVAILABLE_TENANT_CREDS.format(constants.MINDSPHERE_CLIENT_ID) ) if credentials.client_secret: client_secret = credentials.client_secret elif extract_env_var(constants.MINDSPHERE_CLIENT_SECRET): client_secret = extract_env_var(constants.MINDSPHERE_CLIENT_SECRET) else: raise exceptions.MindsphereClientConfigurationError( message=error_messages.UNAVAILABLE_TENANT_CREDS.format(constants.MINDSPHERE_CLIENT_SECRET) ) if credentials.tenant: tenant = credentials.tenant elif extract_env_var(constants.MINDSPHERE_TENANT): tenant = extract_env_var(constants.MINDSPHERE_TENANT) else: raise exceptions.MindsphereClientConfigurationError( message=error_messages.UNAVAILABLE_TENANT_CREDS.format(constants.MINDSPHERE_TENANT) ) sub_tenant = None if credentials.use_sub_tenant: if credentials.sub_tenant: sub_tenant = credentials.sub_tenant elif extract_env_var(constants.MINDSPHERE_SUB_TENANT): sub_tenant = extract_env_var(constants.MINDSPHERE_SUB_TENANT) else: raise exceptions.MindsphereClientConfigurationError( message=error_messages.UNAVAILABLE_TENANT_CREDS.format(constants.MINDSPHERE_SUB_TENANT) ) return client_id, client_secret, tenant, sub_tenant
[docs]def _invoke_token_endpoint( rest_client_config, client_id, client_secret, tenant, sub_tenant=None ): """ Method to fetch authorization token to preauthorize the service calls. """ if rest_client_config and rest_client_config.host_environment is not None: host_environment = rest_client_config.host_environment else: host_environment = os.environ[constants.HOST_ENVIRONMENT] # Fetching the url components from url_setup.ini file parser = ConfigParser(interpolation=ExtendedInterpolation()) parser.read(config_file) authorization_url_pre = parser.get( constants.AUTHORIZATION_URL, constants.AUTHORIZATION_URL_PRE ) authorization_url_post = parser.get( constants.AUTHORIZATION_URL, constants.AUTHORIZATION_URL_POST ) authorization_grant_type = parser.get( constants.AUTHORIZATION_URL, constants.AUTHORIZATION_URL_GRANT_TYPE ) # Base 64 encoding of service credentials authorization = base64.b64encode((client_id + ":" + client_secret).encode()) authorization = authorization.decode() authorization_url_pre = authorization_url_pre.replace(constants.TENANT, tenant) tenant_url = "".join( [ authorization_url_pre, host_environment, authorization_url_post, "?grant_type=", authorization_grant_type, ] ) if sub_tenant is not None: # Extra url component for sub-tenant tenant_url = "".join( [ tenant_url, "&iam-action=", constants.IAM_ACTION, "&subtenant=" + sub_tenant, ] ) headers = { constants.CONTENT_TYPE: constants.URL_ENCODED, constants.AUTHORIZATION: "Basic " + authorization, } token = mindsphere_core.invoke_service(rest_client_config=rest_client_config, api_url=tenant_url, headers=headers, http_method="POST")[ constants.ACCESS_TOKEN ] return token