# coding: utf-8
"""
SDI - Semantic Data Interconnect APIs
The Semantic Data Interconnect (SDI) is a collection of APIs that allows the user to unlock the potential of disparate big data by connecting external data. The SDI can infer the schemas of data based on schema-on-read, allow creating a semantic model and perform big data semantic queries. It seamlessly connects to MindSphere's Integrated Data Lake (IDL), but it can work independently as well. There are two mechanisms that can be used to upload files so that SDI can generate schemas and make data ready for query. The SDI operations are divided into the following groups: **Data Registration for SDI** This set of APIs is used to organize the incoming data. When configuring a Data Registry, you have the option to update your data based on a replace or append strategy. If you consider a use case where schema may change and incoming data files are completely different every time then replace is a good strategy. The replace strategy will replace the existing schema and data during each data ingest operation whereas the append strategy will update the existing schema and data during each data ingest operation. **Custom Data Type for SDI** The SDI by default identifies basic data types for each property, such as String, Integer, Float, Date, etc. The user can use this set of APIs to create their own custom data type. The SDI also provides an API under this category to suggest data type based on user-provided sample test values. **Data Lake for SDI** The SDI can process files uploaded provides endpoints to manage customer's data lake registration based on tenant id, cloud provider and data lake type. The set of REST endpoint allows to create, update and retrieve base path for their data lake. The IDL customer needs to create an SDI folder that is under the root folder. Any file uploaded in this folder is automatically picked up by SDI to process via IDL notification. **Data Ingest for SDI** This set of APIs allows user to upload files, start an ingest job for uploaded files, find job status for ingested jobs or retrieve all job statuses. **Schema Registry for SDI** The SDI provides a way to find the generated schema in this category. Users can find an SDI generated schema for uploaded files based on source name, data tag or schema name. **Data Query for SDI** allows querying based on the extracted schemas. Important supported APIs are: * Query interface for querying semantically correlated and transformed data. * Stores and executes data queries. * Uses a semantic model to translate model-based query to physical queries. **Semantic Model for SDI** allows user to create semantic model ontologies based on the extracted one or more schemas. The important functionalities achieved with APIs are: * Contextual correlation of data from different systems. * Infers & Recommends mappings between different schemas. * Import and store Semantic model. # noqa: E501
"""
from __future__ import absolute_import
from mindsphere_core.mindsphere_core import logger
from mindsphere_core import mindsphere_core, exceptions, token_service
from mindsphere_core.token_service import init_credentials
[docs]class DataQueryOperationsClient:
__base_path__ = '/api/sdi/v4'
__model_package__ = __name__.split('.')[0]
def __init__(self, rest_client_config=None, mindsphere_credentials=None):
self.rest_client_config = rest_client_config
self.mindsphere_credentials = init_credentials(mindsphere_credentials)
[docs] def get_execution_jobs(self, request_object):
"""Retrieve all jobs
Returns a list of Data Query Execution Model
:param ExecutionJobsGetRequest request_object: It contains the below parameters --> |br| ( pageToken - Selects next page. Value must be taken rom response body property 'page.nextToken'. If omitted, first page is returned. ), |br| ( queryId - Filter based on query id ), |br| ( status - Filter based on job status )
:return: ResponseAllDataQueryExecutionResponse
"""
logger.info('DataQueryOperationsClient.execution_jobs_get() invoked.')
end_point_url = '/executionJobs'
end_point_url = end_point_url.format()
token = token_service.fetch_token(self.rest_client_config, self.mindsphere_credentials)
api_url = mindsphere_core.build_url(self.__base_path__, end_point_url, self.rest_client_config)
headers = {'Accept': 'application/json', 'Authorization': 'Bearer ' + str(token)}
if(request_object != None):
query_params = {'pageToken': request_object.page_token, 'queryId': request_object.query_id, 'status': request_object.status}
else:
query_params = None
form_params, local_var_files, body_params = {}, {}, None
logger.info('DataQueryOperationsClient.execution_jobs_get() --> Proceeding for API Invoker.')
return mindsphere_core.invoke_service(self.rest_client_config, api_url, headers, 'GET', query_params, form_params, body_params, local_var_files, 'ResponseAllDataQueryExecutionResponse', self.__model_package__)
[docs] def delete_execution_jobs_id(self, id):
"""Delete job by job id
Returns NO CONTENT HTTP Status
:param ExecutionJobsIdDeleteRequest request_object: It contains the below parameters --> |br| ( id* - id )
:return: str
"""
logger.info('DataQueryOperationsClient.delete_execution_jobs_id() invoked.')
if id is None:
raise exceptions.MindsphereClientError('`id` is not passed when calling `delete_execution_jobs_id`')
end_point_url = '/executionJobs/{id}'
end_point_url = end_point_url.format(id=id)
token = token_service.fetch_token(self.rest_client_config, self.mindsphere_credentials)
api_url = mindsphere_core.build_url(self.__base_path__, end_point_url, self.rest_client_config)
headers = {'Accept': 'application/json', 'Authorization': 'Bearer ' + str(token)}
query_params = {}
form_params, local_var_files, body_params = {}, {}, None
logger.info('DataQueryOperationsClient.delete_execution_jobs_id() --> Proceeding for API Invoker.')
return mindsphere_core.invoke_service(self.rest_client_config, api_url, headers, 'DELETE', query_params, form_params, body_params, local_var_files, 'str', self.__model_package__)
[docs] def get_execution_jobs_id(self, id):
"""Retrieve job details by job id
Returns Data Query Execution Model
:param ExecutionJobsIdGetRequest id: It contains the below parameters --> |br| ( id* - execution jobId )
:return: DataQueryExecutionResponse
"""
logger.info('DataQueryOperationsClient.get_execution_jobs_id() invoked.')
if id is None:
raise exceptions.MindsphereClientError('`id` is not passed when calling `get_execution_jobs_id`')
end_point_url = '/executionJobs/{id}'
end_point_url = end_point_url.format(id=id)
token = token_service.fetch_token(self.rest_client_config, self.mindsphere_credentials)
api_url = mindsphere_core.build_url(self.__base_path__, end_point_url, self.rest_client_config)
headers = {'Accept': 'application/json', 'Authorization': 'Bearer ' + str(token)}
query_params = {}
form_params, local_var_files, body_params = {}, {}, None
logger.info('DataQueryOperationsClient.get_execution_jobs_id() --> Proceeding for API Invoker.')
return mindsphere_core.invoke_service(self.rest_client_config, api_url, headers, 'GET', query_params, form_params, body_params, local_var_files, 'DataQueryExecutionResponse', self.__model_package__)
# Data deserealization object needs to be added
[docs] def get_execution_jobs_id_results(self, id, range):
"""Retrieve query results by job id
Returns query results
:param ExecutionJobsIdResultsGetRequest request_object: It contains the below parameters --> |br| ( id* - execution jobId ), |br| ( Range - Part of a file to return in Bytes, eg bytes=200-600 )
:return: QueryResult
"""
logger.info('DataQueryOperationsClient.execution_jobs_id_results_get() invoked.')
if id is None:
raise exceptions.MindsphereClientError('`id` is not passed when calling `execution_jobs_id_results_get`')
logger.info("Getting results for executionJobs with id : " + id)
end_point_url = '/executionJobs/{id}/results'
end_point_url = end_point_url.format(id=id)
token = token_service.fetch_token(self.rest_client_config, self.mindsphere_credentials)
api_url = mindsphere_core.build_url(self.__base_path__, end_point_url, self.rest_client_config)
headers = {'Accept': 'application/octet-stream, application/json', 'Authorization': 'Bearer ' + str(token)}
query_params = {}
form_params, local_var_files, body_params = {}, {}, None
logger.info('DataQueryOperationsClient.execution_jobs_id_results_get() --> Proceeding for API Invoker.')
return mindsphere_core.invoke_service(self.rest_client_config, api_url, headers, 'GET', query_params, form_params, body_params, local_var_files, 'str', self.__model_package__)
[docs] def get_queries(self, request_object):
"""Retrieve all queries
Returns a list of Data Query SQL Model
:param QueriesGetRequest request_object: It contains the below parameters --> |br| ( pageToken - Selects next page. Value must be taken rom response body property 'page.nextToken'. If omitted, first page is returned. ), |br| ( executable - Filter based on executable flag ), |br| ( isDynamic - Filter based on isDynamic flag ), |br| ( ontologyId - Filter based on ontology id )
:return: ResponseAllDataSQLQuery
"""
logger.info('DataQueryOperationsClient.get_queries() invoked.')
end_point_url = '/queries'
end_point_url = end_point_url.format()
token = token_service.fetch_token(self.rest_client_config, self.mindsphere_credentials)
api_url = mindsphere_core.build_url(self.__base_path__, end_point_url, self.rest_client_config)
headers = {'Accept': 'application/json', 'Authorization': 'Bearer ' + str(token)}
query_params = {'pageToken': request_object.page_token, 'executable': request_object.executable, 'isDynamic': request_object.is_dynamic, 'ontologyId': request_object.ontology_id}
form_params, local_var_files, body_params = {}, {}, None
logger.info('DataQueryOperationsClient.get_queries() --> Proceeding for API Invoker.')
return mindsphere_core.invoke_service(self.rest_client_config, api_url, headers, 'GET', query_params, form_params, body_params, local_var_files, 'ResponseAllDataSQLQuery', self.__model_package__)
[docs] def delete_queries_id(self, id):
"""Delete query by query id
Returns NO CONTENT HTTP Status
:param QueriesIdDeleteRequest id: It contains the below parameters --> |br| ( id* - id )
:return: str
"""
logger.info('DataQueryOperationsClient.delete_queries_id() invoked.')
if id is None:
raise exceptions.MindsphereClientError('`id` is not passed when calling `delete_queries_id`')
end_point_url = '/queries/{id}'
end_point_url = end_point_url.format(id=id)
token = token_service.fetch_token(self.rest_client_config, self.mindsphere_credentials)
api_url = mindsphere_core.build_url(self.__base_path__, end_point_url, self.rest_client_config)
headers = {'Accept': 'application/json', 'Authorization': 'Bearer ' + str(token)}
query_params = {}
form_params, local_var_files, body_params = {}, {}, None
logger.info('DataQueryOperationsClient.delete_queries_id() --> Proceeding for API Invoker.')
return mindsphere_core.invoke_service(self.rest_client_config, api_url, headers, 'DELETE', query_params, form_params, body_params, local_var_files, 'str', self.__model_package__)
[docs] def get_queries_id_execution_jobs_latest_results(self, id, range):
"""Retrieve latest query results by query id
Returns query results
:param QueriesIdExecutionJobsLatestResultsGetRequest request_object: It contains the below parameters --> |br| ( id* - query id ), |br| ( Range - Part of a file to return in Bytes, eg bytes=200-600 )
:return: QueryResult
"""
logger.info('DataQueryOperationsClient.get_queries_id_execution_jobs_latest_results() invoked.')
if id is None:
raise exceptions.MindsphereClientError('`id` is not passed when calling `get_queries_id_execution_jobs_latest_results`')
end_point_url = '/queries/{id}/executionJobs/latestResults'
end_point_url = end_point_url.format(id=id)
token = token_service.fetch_token(self.rest_client_config, self.mindsphere_credentials)
api_url = mindsphere_core.build_url(self.__base_path__, end_point_url, self.rest_client_config)
headers = {'Accept': 'application/octet-stream, application/json', 'Range': range, 'Authorization': 'Bearer ' + str(token)}
query_params = {}
form_params, local_var_files, body_params = {}, {}, None
logger.info('DataQueryOperationsClient.get_queries_id_execution_jobs_latest_results() --> Proceeding for API Invoker.')
return mindsphere_core.invoke_service(self.rest_client_config, api_url, headers, 'GET', query_params, form_params, body_params, local_var_files, 'str', self.__model_package__)
[docs] def create_queries_id_execution_jobs(self, id, request_object):
"""Create execution job for dynamic query. There is soft limit of upto 10 number of parameters and aliases.
Returns id of created job
:param QueriesIdExecutionJobsPostRequest request_object: It contains the below parameters --> |br| ( id* - query id ), |br| ( dataQueryExecuteRequest* - dataQueryExecuteRequest )
:return: DataQueryExecuteQueryResponse
"""
logger.info('DataQueryOperationsClient.queries_id_execution_jobs_post() invoked.')
if request_object is None:
raise exceptions.MindsphereClientError('`request_object` is not passed when calling `queries_id_execution_jobs_post`')
if id is None:
raise exceptions.MindsphereClientError('The required parameter `id` is missing from `request_object`, when calling `queries_id_execution_jobs_post`')
end_point_url = '/queries/{id}/executionJobs'
end_point_url = end_point_url.format(id=id)
token = token_service.fetch_token(self.rest_client_config, self.mindsphere_credentials)
api_url = mindsphere_core.build_url(self.__base_path__, end_point_url, self.rest_client_config)
headers = {'Accept': 'application/json', 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + str(token)}
query_params = {}
form_params, local_var_files, body_params = {}, {}, request_object
logger.info('DataQueryOperationsClient.queries_id_execution_jobs_post() --> Proceeding for API Invoker.')
return mindsphere_core.invoke_service(self.rest_client_config, api_url, headers, 'POST', query_params, form_params, body_params, local_var_files, 'DataQueryExecuteQueryResponse', self.__model_package__)
[docs] def get_queries_id(self, request_object):
"""Retrieve query by query id
Returns a Data Query SQL Model
:param QueriesIdGetRequest request_object: It contains the below parameters --> |br| ( id* - id )
:return: NativeQueryGetResponse
"""
logger.info('DataQueryOperationsClient.get_queries_id() invoked.')
if request_object is None:
raise exceptions.MindsphereClientError('`request_object` is not passed when calling `get_queries_id`')
end_point_url = '/queries/{id}'
end_point_url = end_point_url.format(id=request_object)
token = token_service.fetch_token(self.rest_client_config, self.mindsphere_credentials)
api_url = mindsphere_core.build_url(self.__base_path__, end_point_url, self.rest_client_config)
headers = {'Accept': 'application/json', 'Authorization': 'Bearer ' + str(token)}
query_params = {}
form_params, local_var_files, body_params = {}, {}, None
logger.info('DataQueryOperationsClient.get_queries_id() --> Proceeding for API Invoker.')
return mindsphere_core.invoke_service(self.rest_client_config, api_url, headers, 'GET', query_params, form_params, body_params, local_var_files, 'NativeQueryGetResponse', self.__model_package__)
[docs] def update_queries_id(self, id, request_object):
"""Update query by query id
Returns Updated Data Query SQL Model
:param QueriesIdPatchRequest request_object: It contains the below parameters --> |br| ( DataQuerySQLUpdateRequest* - DataQuerySQLUpdateRequest ), |br| ( id* - id )
:return: DataQuerySQLResponse
"""
logger.info('DataQueryOperationsClient.update_queries_id() invoked.')
if request_object is None:
raise exceptions.MindsphereClientError('`request_object` is not passed when calling `update_queries_id`')
if id is None:
raise exceptions.MindsphereClientError('The required parameter `id` is missing, when calling `update_queries_id`')
end_point_url = '/queries/{id}'
end_point_url = end_point_url.format(id=id)
token = token_service.fetch_token(self.rest_client_config, self.mindsphere_credentials)
api_url = mindsphere_core.build_url(self.__base_path__, end_point_url, self.rest_client_config)
headers = {'Accept': 'application/json', 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + str(token)}
query_params = {}
form_params, local_var_files, body_params = {}, {}, request_object
logger.info('DataQueryOperationsClient.update_queries_id() --> Proceeding for API Invoker.')
return mindsphere_core.invoke_service(self.rest_client_config, api_url, headers, 'PATCH', query_params, form_params, body_params, local_var_files, 'DataQuerySQLResponse', self.__model_package__)
[docs] def create_queries(self, request_object):
"""Create new queries
Returns id of created query
:param QueriesPostRequest request_object: It contains the below parameters --> |br| ( dataQuerySQLRequest* - dataQuerySQLRequest )
:return: DataQuerySQLResponse
"""
logger.info('DataQueryOperationsClient.create_queries() invoked.')
if request_object is None:
raise exceptions.MindsphereClientError('`request_object` is not passed when calling `create_queries`')
end_point_url = '/queries'
end_point_url = end_point_url.format()
token = token_service.fetch_token(self.rest_client_config, self.mindsphere_credentials)
api_url = mindsphere_core.build_url(self.__base_path__, end_point_url, self.rest_client_config)
headers = {'Accept': 'application/json', 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + str(token)}
query_params = {}
form_params, local_var_files, body_params = {}, {}, request_object
logger.info('DataQueryOperationsClient.create_queries() --> Proceeding for API Invoker.')
return mindsphere_core.invoke_service(self.rest_client_config, api_url, headers, 'POST', query_params, form_params, body_params, local_var_files, 'DataQuerySQLResponse', self.__model_package__)