Using Jupyter Notebook¶
You can use different Public APIs such as Data Exchange, Model Management, and IoT Time Series from Jupyter Notebook, the model development workspace for Predictive Learning. Your tenant must have valid access to these APIs in order to utilize them in the workspace environment. The Jupyter Notebook can be accessed from the:
- Manage Environments page
- Manage Analytics Workspaces page
The steps to accomplish this are outlined in the following procedures.
Go here for in-depth information on Public APIs.
Checking your Configuration¶
When working with scripts, your environment will require a certain set of libraries. Some libraries required to run the minimal services within the cluster come preinstalled and are available here.
Run these commands to examine installed packages from Jupyter:
!pip freeze --user
We recommend you install the required packages at the begining of the notebook and execute it each time the cluster has been started. Currently Predictive Learning does not store the custom actions you perform on the cluster between restarts.
Using Inputs from Job Executions¶
In the current implementation, all job executions require parameters; these parameters can be one of Data Exchange, IoT, Data Lake or Predictive Learning Storage (PrL Storage). For the first three, Job Manager will ensure copying the input into a temporary location that is available to your code. In Jupyter notebooks there are three variables available: inputFolder, outputFolder anddatasetName. These can be read using the Jupyter magic command %store as in:
%store -r inputFolder #-r specifies a read %store -r outputFolder %store -r datasetName
The datasetName variable will only contain a value when you use the IoT input type. The inputFolder is prefilled by the job execution engine with a value pointing to the temporary location that holds the input files or data. That will be an S3 path on AWS or a blob storage on Azure. It does not contain the associated prefix like s3://. You can then use the outputFolder variable in a Jupyter notebook as in:
!aws s3 cp ./mylocalfile.txt s3://$outputFolder+'/myfile.txt'
For Jupyter notebooks Predictive Learning does not provide a built-in library for loading the dataset, however, there are various ways to obtain it using Python. If you encounter issues in loading your dataset, feel free to contact us for guidance.
When using Zeppelin notebooks, these variables can be read using the Zeppelin session instance variable named 'z', available under Spark and PySpark Interpreters:
//make sure you have the proper context set up at the beginning of your paragraph, like %Spark
var inpf = z.get( inputFolder )
var outf = zget( outputFolder )
var dsname = z.get( datasetName )
When you want to load the dataset you need in your Zeppelin notebook, you can use the built-in library functions, like this:
%spark
var ds =
com.siemens.mindsphere.prl.tools.dsaccess.DatasetUtil.loadDatasetByName (dsname.spark)
//make sure you also pass in the spark context
Always take into account that both inputFolder and outputFolder variables are remote storage paths, and not local folders, therefore most of the regular file functions will no work against it. However, the CLI and shell commands will use these as long as you correctly prefix them. For the Python Scala libraries that can work with remote storage services, Predictive Learning recommends checking the respective library's documentation; for example, The pandas Python library can save and read from AWS S3 storage.
Installing your own Python Libraries¶
Run these commands to install libraries:
#upgrade pip and install required libraries
!python3 -m pip install --upgrade pip --user
!python3 -m pip install requests --force-reinstall --upgrade --user
!python3 -m pip install pandas --force-reinstall --upgrade --user
!python3 -m pip install pyarrow --force-reinstall --upgrade --user
Not all external repositories are allowed. If you require additional external sources for your project, please contact your organization's PrL administrator.
Once the instance is stopped, all libraries and modifications performed on the instance will be lost. Due to that, you will need to run the installation paragraphs every time the note is imported into Jupyter, to make sure everything is up to date on the machine when you start working.
More About Jupyter Notebook¶
Jupyter is a powerful tool that allows multiple customizations and languages. These resources can help you explore further:
https://jupyter.org/documentation
https://jupyter-notebook.readthedocs.io/en/stable/
https://ipython.readthedocs.io/en/stable/interactive/magics.html
Exporting Notebooks to Analytical Models¶
When your model is ready to be deployed as a job you can easily move it into Model Management tool allowing it to be exposed to job execution, with no notebook export needed. Model versioning is not supported, which means that if you want to update a model that exists in Model Management, you must first export it from Jupyter Notebook.
Jupyter Notebook Export Illustration¶
To export a Jupyter Notebook, click the Export to AMM button, as shown in this image:
Export Windows Example¶
Here is an example of the po-up windows that display for the export:
How to Export a Jupyter Notebook to an Analytical Model¶
Follow these steps to export a Jupyter Notebook:
- Navigate to the Jupyter Notebook.
- Click the Export to AMM button. The Create New Model pop-up window displays.
- Enter the name of the new model in the Name field.
- Click Import. A Success message indicates the import has been submitted.
- Click Ok. The Success message closes.
- Click Ok to close the 'Import finished successfully' message.
Calling a Data Exchange API¶
The code below shows an example of an API call from a Zeppelin Notebook using the Python interpreter and Predictive's internal gateway that handles the authentication procedures in a seamless manner:
import os
import requests
import json
#get the proxy – we call it Gateway- URL
gw = os.environ['GATEWAY_ENDPOINT'] + '/gateway/'
#some paths to remember
DEpath = 'dataexchange/v3/'
dirs = 'directories/'
pub = '_PUBLIC_ROOT_ID'
response = requests.get(gw + DEpath + dirs +pub) #this will list the Public Directory from Data Exchange
#let’s parse the response
allpub = json.loads(response.content)
#we only read the ‘files’; this also contains the ‘directories’ child which is also iterrable
for file in allpub ['files']:
print("File id: " + str(file['id']))
#uploading file work in a similar fashion; or working with other MDSP services
This example shows the call to Predictive Learning Developer Data Exchange API and lists the contents of the public root folder.
Copying Data From Integrated Data Lake (IDL)¶
Run the code below to obtain a temporary token (via the PrL gateway) and enable PrL to directly perform a read operation on the IDL API data bucket.
Once the AWS temporary keys have been set up, you can use AWS CLI commands to perform read operations against the Integrated Data Lake bucket.
The bucket name can be observed from the data lake's response, in a json format. The path part /data/ten=mytenant/ is fixed and cannot be changed.
Run the commands below to copy data from IDL:
Uploading Data to IDL¶
import os
import requests
import json
dlpath = '/datalake/v3/generateAccessToken'
gw = os.environ['GATEWAY_ENDPOINT'] + '/gateway/'
# increment_value = 1
headers = {
'Content-Type': 'application/json'
}
payload="{ \"subtenantId\":\"\" } "
dl_url = gw + dlpath
response = requests.post(dl_url, data=payload, headers=headers)
#print(response.status_code)
dl = json.loads(response.text)
os.environ["AWS_ACCESS_KEY_ID"] = dl['credentials']['accessKeyId']
os.environ["AWS_SECRET_ACCESS_KEY"] = dl['credentials']['secretAccessKey']
os.environ["AWS_SESSION_TOKEN"] = dl['credentials']['sessionToken']
Run the following commands to upload data in Integrated Data Lake:¶
# upgrade pip and install required libraries
!python3 -m pip install --upgrade pip --user
!python3 -m pip install requests --force-reinstall --upgrade --user
!python3 -m pip install pandas --force-reinstall --upgrade --user
!python3 -m pip install pyarrow --force-reinstall --upgrade --user
import datetime
import requests
import os
import json
import re
HEADERS = {
'Accept': '/',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Content-Type': 'application/json'
}
GATEWAY = os.environ['GATEWAY_ENDPOINT'] + '/gateway/'
OUTPUT_FOLDER = 'OUTPUT_FOLDER'
#Get a signed URL for down/upload of data. The function
#attempts for 5 times to obtain the URL and then raises an exception.
def getSignedURL(fileName, folder, attempt=0, upload=True):
if upload:
IDLpath = 'datalake/v3/generateUploadObjectUrls'
else:
IDLpath = 'datalake/v3/generateDownloadObjectUrls'
IDLFilePath = '/%s/%s' % (folder, fileName)
url = GATEWAY + IDLpath
body='{"paths": [{"path": "%s"}]}' % IDLFilePath
response = requests.post(url, headers=HEADERS, data=body)
try:
return json.loads(response.text)['objectUrls'][0]['signedUrl']
except KeyError:
if attempt < 5:
attempt += 1
return getSignedURL(fileName, attempt, upload)
else:
raise Exception('Failed to get a signed URL')
!echo "This is a test!" >> test.txt
fileName='test.txt'
signedURL = getSignedURL(fileName, OUTPUT_FOLDER)
requests.put(signedURL, headers=HEADERS, data=fileName)
Reading data from IoT¶
Run the following commands to read data from IoT sources:
!python3 -m pip install --upgrade pip --user
!python3 -m pip install requests --force-reinstall --upgrade --user
!python3 -m pip install awscli --force-reinstall --upgrade --user
!python3 -m pip install pandas sklearn seaborn matplotlib joblib --user
import json
import io
import os
import datetime
import time
from dateutil import parser
import random
from threading import Thread
import requests
import pandas as pd
import tempfile
def read_iot(entity_id = "<
aspect_name = "<
tenant = "tenantname",
max_results = 2000, #max is 2000
from_dt = "2020-06-01T13:09:37.029Z",
to_dt = "2020-07-01T08:02:27.962Z",
variable = "pressure",
sort = "asc"):
if variable is not None:
url = "?from=" + from_dt + "&to=" + to_dt + "&sort=" + sort + "&limit=" + str(max_results) + "&select=" + variable
else:
url = "?from=" + from_dt + "&to=" + to_dt + "&sort=" + sort + "&limit=" + str(max_results)
#this is the IoT Timeseries API base URL
TSpath = 'iottimeseries/v3/timeseries'
#this is the Predictive Gateway URL that handles authentication for your API calls
gw = os.environ['GATEWAY_ENDPOINT'] + '/gateway/'
headers = {
'Content-Type': 'application/json'
}
iot_url = gw + TSpath + "/" + entity_id + "/" + aspect_name + url
response = requests.get(iot_url, headers=headers)
return response
import pandas as pd
import tempfile
start = datetime.datetime.utcnow() - datetime.timedelta(days=70)
end = start + datetime.timedelta(days=30)
response = read_iot(entity_id = "<
aspect_name = "<
tenant = "tenantname",
max_results = 2000, #max is 2000
from_dt = start.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z',
to_dt = end.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z',
sort = "asc",
variable = None)
if response.status_code == 200:
f = tempfile.TemporaryFile()
f.write(response.content)
f.seek(0)
#we read the IoT data into a Pandas DataFrame
data = pd.read_json(f.read())
f = tempfile.TemporaryFile()
f.write(response.content)
f.seek(0)
print(data.shape)
else:
print(response.status_code)
print(response.content)
Except where otherwise noted, content on this site is licensed under the Development License Agreement.