Source code for mindsphere_core.mindsphere_core

# -*- coding: utf-8 -*-
"""
    This module implements the common features of sdk used by all clients.

    Copyright (C), Siemens AG 2018 Licensed under the
    MindSphere Developer License, see LICENSE.md.
"""

import os
import os.path
from mindsphere_core import log_config
import requests
import json
from configparser import ConfigParser, ExtendedInterpolation

from . import commonutil
from . import constants
from . import exceptions
from . import serialization_filter
from . import error_messages
from . import token_service


logger = log_config.default_logging()

path = os.path.abspath(__file__)
config_file = os.path.dirname(path) + r"/url_setup.ini"


[docs]class RestClientConfig: """This class allows to configure proxy , timeout and hosting environment.""" def __init__( self, proxy_host=None, proxy_port=None, proxy_username=None, proxy_password=None, connection_timeout_in_seconds=constants.TIME_OUT_DEFAULT, socket_timeout_in_seconds=constants.TIME_OUT_DEFAULT, proxy_schema=constants.PROXY_SCHEMA_DEFAULT, host_environment=None, ): """ proxy_host : str Proxy host proxy_username : str Proxy User name proxy_port : str Proxy port proxy_password : str Proxy password connection_timeout_in_seconds : str Connection timeout socket_timeout_in_seconds : str Socket timeout proxy_schema : str Proxy schema host_environment : str Target hosted environment """ self.proxy_host = proxy_host self.proxy_port = proxy_port self.proxy_username = proxy_username self.proxy_password = proxy_password self.connection_timeout_in_seconds = connection_timeout_in_seconds self.socket_timeout_in_seconds = socket_timeout_in_seconds self.proxy_schema = proxy_schema self.host_environment = host_environment
[docs]def fetch_token(rest_client_config, credentials): """Fetches a token either using service credentials or validates the token if its already provided in credentials """ return token_service.fetch_token(rest_client_config, credentials)
[docs]def build_url(base_path, end_point_url, config=None): """builds the end point url based on host environment. base_path : str base path for the selected service end_point_url : str url for the requested service end point config : RestClientConfig config object set by user """ if config and config.host_environment: host_environment = config.host_environment elif constants.HOST_ENVIRONMENT in os.environ: host_environment = os.environ[constants.HOST_ENVIRONMENT] else: host_environment = constants.HOST_ENV_DEFAULT parser = ConfigParser(interpolation=ExtendedInterpolation()) parser.read(config_file) service_url_pre = parser.get(constants.SERVICE_URL, constants.SERVICE_URL_PRE) service_url_post = parser.get(constants.SERVICE_URL, constants.SERVICE_URL_POST) return "".join( [service_url_pre, host_environment, service_url_post, base_path, end_point_url] )
[docs]def invoke_service( rest_client_config, api_url, headers, http_method, query_params=None, form_params=None, body_params=None, local_var_files=None, response_type=None, model_package=None, ): """ Method to invoke rest end points. """ if rest_client_config and rest_client_config.connection_timeout_in_seconds: timeout = float(rest_client_config.connection_timeout_in_seconds) else: timeout = None if ( rest_client_config and rest_client_config.proxy_host and rest_client_config.proxy_port ): http_proxy = ( "http://" + rest_client_config.proxy_host + ":" + rest_client_config.proxy_port ) proxy_dict = {"https": http_proxy} else: proxy_dict = None if headers: headers = serialization_filter.sanitize_headers(headers) if body_params: # Serialize the input object payload = serialization_filter.sanitize_for_serialization(body_params) if type(payload) is not bytes: # Converts object to string if object # not of type bytes payload = json.dumps(payload) elif form_params: payload = form_params else: payload = None try: response = requests.request( http_method, api_url, params=query_params, data=payload, files=local_var_files, headers=headers, proxies=proxy_dict, timeout=timeout, ) response.raise_for_status() if response.text: json_data = response.text if not response_type or response_type != constants.STR_TYPE: # Converts response to object # Except when response type is string json_data = json.loads(response.text) if response_type: # Finds the relative path to model package serialization_filter.service_model_package = ( model_package + constants.MODELS_PACKAGE ) # De-serialize the response to desired response type json_data = serialization_filter.deserialize(json_data, response_type) return json_data except requests.exceptions.HTTPError as error_http: error = commonutil.parse_exception(error_http.response) raise exceptions.MindsphereServerError(error) except requests.exceptions.ConnectionError as error_connection: raise exceptions.MindsphereServerError( error_messages.CONNECTION_ERROR, error_connection ) except requests.exceptions.Timeout as error_time_out: raise exceptions.MindsphereServerError( error_messages.TIMEOUT_ERROR, error_time_out ) except requests.exceptions.RequestException as error: raise exceptions.MindsphereServerError( error_messages.UN_IDENTIFIED_ERROR, error )