import base64
from configparser import ConfigParser, ExtendedInterpolation
import os
import os.path
from .commonutil import is_token_valid, extract_env_var, get_tenant_from_token
from mindsphere_core import mindsphere_core
from mindsphere_core import identification
from . import mindsphere_credentials
from .mindsphere_credentials import AppCredentials
from .mindsphere_credentials import TenantCredentials
from . import constants
from . import exceptions
from . import error_messages
path = os.path.abspath(__file__)
config_file = os.path.dirname(path) + r"/url_setup.ini"
[docs]class AppCredsRequestBody:
def __init__(self, app_name, app_version, host_tenant, user_tenant):
self.grant_type = 'client_credentials'
self.appName = app_name
self.appVersion = app_version
self.hostTenant = host_tenant
self.userTenant = user_tenant
[docs]def init_credentials(credentials):
"""initializes the credentials after checking the env variables
if its not set by user.
"""
if credentials:
return credentials
elif not _is_app_creds_present():
return AppCredentials()
elif _is_tenant_creds_present():
return TenantCredentials()
[docs]def fetch_token(rest_client_config, credentials):
"""Fetches a token either using service credentials
or validates the token if its already provided in credentials
"""
if credentials and hasattr(credentials, 'cached_token') and credentials.cached_token and is_token_valid(credentials.cached_token, True):
return credentials.cached_token
elif credentials:
return _get_requested_token(rest_client_config, credentials)
elif _is_app_creds_present():
return _get_token_with_app_creds(rest_client_config, credentials)
elif _is_tenant_creds_present():
return _get_token_with_tenant_creds(rest_client_config, credentials)
else:
raise exceptions.MindsphereForbiddenAccessError(
message=error_messages.AUTHORIZATION_FAILED,
http_status=401
)
[docs]def _get_requested_token(rest_client_config, credentials):
if type(credentials) is mindsphere_credentials.AppCredentials:
return _get_token_with_app_creds(rest_client_config, credentials)
elif type(credentials) is mindsphere_credentials.TenantCredentials:
return _get_token_with_tenant_creds(rest_client_config, credentials)
elif type(credentials) is mindsphere_credentials.UserToken:
return _get_token_with_user_token(credentials)
else:
raise exceptions.MindsphereForbiddenAccessError(
message=error_messages.INCORRECT_CREDENTIAL,
http_status=401
)
[docs]def _get_token_with_app_creds(rest_client_config, credentials):
if credentials is None:
credentials = AppCredentials()
body_params, key_store_client_id, key_store_client_secret = _populate_app_creds(credentials)
# Base 64 encoding of service credentials
authorization = base64.b64encode((key_store_client_id + ":" + key_store_client_secret).encode())
authorization = authorization.decode()
headers = {'Accept': 'application/hal+json', 'Content-Type': 'application/json',
'X-SPACE-AUTH-KEY': 'Basic '+str(authorization)}
base_path = '/api/technicaltokenmanager/v3/oauth/token'
api_url = mindsphere_core.build_url(base_path, '', rest_client_config)
access_token = mindsphere_core.invoke_service(rest_client_config=rest_client_config, api_url=api_url, headers=headers, http_method='POST',
body_params=body_params)[
constants.ACCESS_TOKEN
]
credentials.cached_token = access_token
return access_token
[docs]def _get_token_with_tenant_creds(rest_client_config, credentials):
if credentials is None:
credentials = TenantCredentials()
client_id, client_secret, tenant, sub_tenant = _populate_tenant_creds(credentials)
access_token = _invoke_token_endpoint(
rest_client_config,
client_id,
client_secret,
tenant,
sub_tenant,
)
credentials.cached_token = access_token
return access_token
[docs]def _get_token_with_user_token(credentials):
is_valid = is_token_valid(credentials.authorization, False)
if not is_valid:
raise exceptions.MindsphereForbiddenAccessError(
message=error_messages.AUTHORIZATION_INACTIVE,
http_status=403
)
else:
return credentials.authorization
[docs]def _is_app_creds_present():
if extract_env_var(constants.MDSP_OS_VM_APP_NAME) and \
extract_env_var(constants.MDSP_OS_VM_APP_VERSION) and \
extract_env_var(constants.MDSP_KEY_STORE_CLIENT_ID) and \
extract_env_var(constants.MDSP_KEY_STORE_CLIENT_SECRET) and \
extract_env_var(constants.MDSP_HOST_TENANT) and \
extract_env_var(constants.MDSP_USER_TENANT):
return True
else:
return False
[docs]def _is_tenant_creds_present():
if extract_env_var(constants.MINDSPHERE_CLIENT_ID) and \
extract_env_var(constants.MINDSPHERE_CLIENT_SECRET) and \
extract_env_var(constants.MINDSPHERE_TENANT):
return True
else:
return False
[docs]def _populate_app_creds(credentials):
if credentials.app_name:
app_name = credentials.app_name
elif extract_env_var(constants.MDSP_OS_VM_APP_NAME):
app_name = extract_env_var(constants.MDSP_OS_VM_APP_NAME)
else:
raise exceptions.MindsphereClientConfigurationError(
message=error_messages.UNAVAILABLE_APP_CREDS.format(constants.MDSP_OS_VM_APP_NAME)
)
if credentials.app_version:
app_version = credentials.app_version
elif extract_env_var(constants.MDSP_OS_VM_APP_VERSION):
app_version = extract_env_var(constants.MDSP_OS_VM_APP_VERSION)
else:
raise exceptions.MindsphereClientConfigurationError(
message=error_messages.UNAVAILABLE_APP_CREDS.format(constants.MDSP_OS_VM_APP_VERSION)
)
if credentials.key_store_client_id:
key_store_client_id = credentials.key_store_client_id
elif extract_env_var(constants.MDSP_KEY_STORE_CLIENT_ID):
key_store_client_id = extract_env_var(constants.MDSP_KEY_STORE_CLIENT_ID)
else:
raise exceptions.MindsphereClientConfigurationError(
message=error_messages.UNAVAILABLE_APP_CREDS.format(constants.MDSP_KEY_STORE_CLIENT_ID)
)
if credentials.key_store_client_secret:
key_store_client_secret = credentials.key_store_client_secret
elif extract_env_var(constants.MDSP_KEY_STORE_CLIENT_SECRET):
key_store_client_secret = extract_env_var(constants.MDSP_KEY_STORE_CLIENT_SECRET)
else:
raise exceptions.MindsphereClientConfigurationError(
message=error_messages.UNAVAILABLE_APP_CREDS.format(constants.MDSP_KEY_STORE_CLIENT_SECRET)
)
host_tenant_from_token = user_tenant_from_token = None
if credentials.authorization:
host_tenant_from_token, user_tenant_from_token = get_tenant_from_token(credentials.authorization)
if host_tenant_from_token:
host_tenant = host_tenant_from_token
elif credentials.host_tenant:
host_tenant = credentials.host_tenant
elif extract_env_var(constants.MDSP_HOST_TENANT):
host_tenant = extract_env_var(constants.MDSP_HOST_TENANT)
else:
raise exceptions.MindsphereClientConfigurationError(
message=error_messages.UNAVAILABLE_APP_CREDS.format(constants.MDSP_HOST_TENANT)
)
if user_tenant_from_token:
user_tenant = user_tenant_from_token
elif credentials.user_tenant:
user_tenant = credentials.user_tenant
elif extract_env_var(constants.MDSP_USER_TENANT):
user_tenant = extract_env_var(constants.MDSP_USER_TENANT)
else:
raise exceptions.MindsphereClientConfigurationError(
message=error_messages.UNAVAILABLE_APP_CREDS.format(constants.MDSP_USER_TENANT)
)
return identification.Identification(app_name, app_version, host_tenant, user_tenant), key_store_client_id, key_store_client_secret
[docs]def _populate_tenant_creds(credentials):
if credentials.client_id:
client_id = credentials.client_id
elif extract_env_var(constants.MINDSPHERE_CLIENT_ID):
client_id = extract_env_var(constants.MINDSPHERE_CLIENT_ID)
else:
raise exceptions.MindsphereClientConfigurationError(
message=error_messages.UNAVAILABLE_TENANT_CREDS.format(constants.MINDSPHERE_CLIENT_ID)
)
if credentials.client_secret:
client_secret = credentials.client_secret
elif extract_env_var(constants.MINDSPHERE_CLIENT_SECRET):
client_secret = extract_env_var(constants.MINDSPHERE_CLIENT_SECRET)
else:
raise exceptions.MindsphereClientConfigurationError(
message=error_messages.UNAVAILABLE_TENANT_CREDS.format(constants.MINDSPHERE_CLIENT_SECRET)
)
if credentials.tenant:
tenant = credentials.tenant
elif extract_env_var(constants.MINDSPHERE_TENANT):
tenant = extract_env_var(constants.MINDSPHERE_TENANT)
else:
raise exceptions.MindsphereClientConfigurationError(
message=error_messages.UNAVAILABLE_TENANT_CREDS.format(constants.MINDSPHERE_TENANT)
)
sub_tenant = None
if credentials.use_sub_tenant:
if credentials.sub_tenant:
sub_tenant = credentials.sub_tenant
elif extract_env_var(constants.MINDSPHERE_SUB_TENANT):
sub_tenant = extract_env_var(constants.MINDSPHERE_SUB_TENANT)
else:
raise exceptions.MindsphereClientConfigurationError(
message=error_messages.UNAVAILABLE_TENANT_CREDS.format(constants.MINDSPHERE_SUB_TENANT)
)
return client_id, client_secret, tenant, sub_tenant
[docs]def _invoke_token_endpoint(
rest_client_config, client_id, client_secret, tenant, sub_tenant=None
):
"""
Method to fetch authorization token to preauthorize the service calls.
"""
if rest_client_config and rest_client_config.host_environment is not None:
host_environment = rest_client_config.host_environment
else:
host_environment = os.environ[constants.HOST_ENVIRONMENT]
# Fetching the url components from url_setup.ini file
parser = ConfigParser(interpolation=ExtendedInterpolation())
parser.read(config_file)
authorization_url_pre = parser.get(
constants.AUTHORIZATION_URL, constants.AUTHORIZATION_URL_PRE
)
authorization_url_post = parser.get(
constants.AUTHORIZATION_URL, constants.AUTHORIZATION_URL_POST
)
authorization_grant_type = parser.get(
constants.AUTHORIZATION_URL, constants.AUTHORIZATION_URL_GRANT_TYPE
)
# Base 64 encoding of service credentials
authorization = base64.b64encode((client_id + ":" + client_secret).encode())
authorization = authorization.decode()
authorization_url_pre = authorization_url_pre.replace(constants.TENANT, tenant)
tenant_url = "".join(
[
authorization_url_pre,
host_environment,
authorization_url_post,
"?grant_type=",
authorization_grant_type,
]
)
if sub_tenant is not None:
# Extra url component for sub-tenant
tenant_url = "".join(
[
tenant_url,
"&iam-action=",
constants.IAM_ACTION,
"&subtenant=" + sub_tenant,
]
)
headers = {
constants.CONTENT_TYPE: constants.URL_ENCODED,
constants.AUTHORIZATION: "Basic " + authorization,
}
token = mindsphere_core.invoke_service(rest_client_config=rest_client_config, api_url=tenant_url, headers=headers, http_method="POST")[
constants.ACCESS_TOKEN
]
return token