Skip to content

Using Jupyter Notebook

You can use different Public APIs such as Data Exchange, Model Management, and IoT Time Series from Jupyter Notebook, the model development workspace for Predictive Learning. Your tenant must have valid access to these APIs in order to utilize them in the workspace environment. The Jupyter Notebook can be accessed from the:

  • Manage Environments page
  • Manage Analytics Workspaces page

The steps to accomplish this are outlined in the following procedures.

Go here for in-depth information on Public APIs.

Checking your Configuration

When working with scripts, your environment will require a certain set of libraries. Some libraries required to run the minimal services within the cluster come preinstalled and are available here.

Run these commands to examine installed packages from Jupyter:

%pip freeze --user

We recommend you install the required packages at the begining of the notebook and execute it each time the cluster has been started. Currently Predictive Learning does not store the custom actions you perform on the cluster between restarts.

Using Inputs from Job Executions

In the current implementation, all job executions require parameters; these parameters can be one of Data Exchange, IoT, Data Lake or Predictive Learning Storage (PrL Storage). For the first three, Job Manager will ensure copying the input into a temporary location that is available to your code. In Jupyter notebooks there are three variables available: inputFolder, outputFolder anddatasetName. These can be read using the Jupyter magic command %store as in:

%store -r inputFolder #-r specifies a read
%store -r outputFolde
%store -r datasetName

The datasetName variable will only contain a value when you use the IoT input type. The inputFolder is prefilled by the job execution engine with a value pointing to the temporary location that holds the input files or data. That will be an S3 path on AWS or a blob storage on Azure. It does not contain the associated prefix like s3://. You can then use the outputFolder variable in a Jupyter notebook as in:

!aws s3 cp ./mylocalfile.txt s3://$outputFolder+'/myfile.txt'

Always take into account that both inputFolder and outputFolder variables are remote storage paths, and not local folders, therefore most of the regular file functions will no work against it. However, the CLI and shell commands will use these as long as you correctly prefix them. For the Python Scala libraries that can work with remote storage services, Predictive Learning recommends checking the respective library's documentation; for example, The pandas Python library can save and read from AWS S3 storage.

Installing your own Python Libraries

Run these commands to install libraries:

#upgrade pip and install required libraries
%pip install --upgrade pip --user
%install requests --force-reinstall --upgrade --user
%pip install pandas --force-reinstall --upgrade --user
%install pyarrow --force-reinstall --upgrade --user

Not all external repositories are allowed. If you require additional external sources for your project, please contact your organization's PrL administrator.

Once the instance is stopped, all libraries and modifications performed on the instance will be lost. Due to that, you will need to run the installation paragraphs every time the note is imported into Jupyter, to make sure everything is up to date on the machine when you start working.

More About Jupyter Notebook

Jupyter is a powerful tool that allows multiple customizations and languages. These resources can help you explore further:

https://jupyter.org/documentation

https://jupyter-notebook.readthedocs.io/en/stable/

https://ipython.readthedocs.io/en/stable/interactive/magics.html

Exporting Notebooks to Analytical Models

When your model is ready to be deployed as a job you can easily move it into Model Management tool allowing it to be exposed to job execution, with no notebook export needed. Model versioning is not supported, which means that if you want to update a model that exists in Model Management, you must first export it from Jupyter Notebook.

Jupyter Notebook Export Illustration

To export a Jupyter Notebook, click the Export to AMM button, as shown in this image:

Jupyter Notebook Export Illustration

Export Windows Example

Here is an example of the po-up windows that display for the export:

Export Windows

How to Export a Jupyter Notebook to an Analytical Model

Follow these steps to export a Jupyter Notebook:

  1. Navigate to the Jupyter Notebook.
  2. Click the Export to AMM button. The Create New Model pop-up window displays.
  3. Enter the name of the new model in the Name field.
  4. Click Import. A Success message indicates the import has been submitted.
  5. Click Ok. The Success message closes.
  6. Click Ok to close the 'Import finished successfully' message.

Calling a Data Exchange API

The code below shows an example of an API call from a Zeppelin Notebook using the Python interpreter and Predictive's internal gateway that handles the authentication procedures in a seamless manner:

import os
import requests
import json
#get the proxy – we call it Gateway- URL
gw = os.environ['GATEWAY_ENDPOINT'] + '/gateway/'
#some paths to remember
DEpath = 'dataexchange/v3/'
dirs = 'directories/'
pub = '_PUBLIC_ROOT_ID'
response = requests.get(gw + DEpath + dirs +pub) #this will list the Public Directory from Data Exchange
#let’s parse the response
allpub = json.loads(response.content)
#we only read the ‘files’; this also contains the ‘directories’ child which is also iterrable
for file in allpub ['files']:
  print("File id: " + str(file['id']))
#uploading file work in a similar fashion; or working with other MDSP services

This example shows the call to Predictive Learning Developer Data Exchange API and lists the contents of the public root folder.

Using exported IoT Datasets

We prepare the environments that you start with our prlutils library that handles reading of parquet dataset files into a Pandas Dataframe. Please use the bellow snippet to show the available datasets and then load a single dataset.

from prlutils import datasetutils
import boto3
import os
import json
import s3fs

du = datasetutils.DatasetUtils()
datasetnames = du.get_dataset_names()
print('Dataset names: ' + str(datasetnames))
#ds.shape
You will get a list of datasets like in:
['test_asset_2',
 'Last30DaysAsset2Filtered',
 'Last30DaysAsset2']
You can load the dataset you want with the:
ds = du.load_dataset_by_name(datasetnames[0])
ds
and check its data immediately: DatasetLoaded Then, you can use your dataset just like with any other Pandas Dataframe:

filteredDataset = ds[ds['temp']>60]
print("Number of entries AFTER filtering: "+ str(filteredDataset.shape))
try:
    path = "s3://"+outputFolder
    filteredDataset.write.csv(path)
    filteredDataset.write.csv('s3://prl-storage-216273414971/prlteam/data/')
except:
    print('Output folder is None.')
else:
    print('Filtered dataset written to outputFolder' + outputFolder)

If you encounter issues with using our libray, make sure that the libraries used by our Datasets utility does not conflict with your previously installed Python libraries. Our utility library uses the following:

%pip install pyarrow fastparquet fss pec s3fs boto3 awscli

Copying Data From Integrated Data Lake (IDL)

Run the code below to obtain a temporary token (via the PrL gateway) and enable PrL to directly perform a read operation on the IDL API data bucket.

Once the AWS temporary keys have been set up, you can use AWS CLI commands to perform read operations against the Integrated Data Lake bucket.

The bucket name can be observed from the data lake's response, in a json format. The path part /data/ten=mytenant/ is fixed and cannot be changed.

Run the commands below to copy data from IDL:

Uploading Data to IDL

import os
import requests
import json
dlpath = '/datalake/v3/generateAccessToken'
gw = os.environ['GATEWAY_ENDPOINT'] + '/gateway/'
# increment_value = 1
headers = {
  'Content-Type': 'application/json'
}
payload="{ \"subtenantId\":\"\" } "
dl_url = gw + dlpath
response = requests.post(dl_url, data=payload, headers=headers)
#print(response.status_code)
dl = json.loads(response.text)
os.environ["AWS_ACCESS_KEY_ID"] = dl['credentials']['accessKeyId']
os.environ["AWS_SECRET_ACCESS_KEY"] = dl['credentials']['secretAccessKey']
os.environ["AWS_SESSION_TOKEN"] = dl['credentials']['sessionToken']

Run the following commands to upload data in Integrated Data Lake:

# upgrade pip and install required libraries
%pip install --upgrade pip --user
%pip install requests --force-reinstall --upgrade --user
%pip install pandas --force-reinstall --upgrade --user
%pip install pyarrow --force-reinstall --upgrade --user
import datetime
import requests
import os
import json
import re
HEADERS = {
   'Accept': '*/*',
   'Accept-Encoding': 'gzip, deflate, br',
   'Connection': 'keep-alive',
   'Content-Type': 'application/json'
}
GATEWAY = os.environ['GATEWAY_ENDPOINT'] + '/gateway/'
OUTPUT_FOLDER = 'OUTPUT_FOLDER'
#Get a signed URL for down/upload of data. The function
#attempts for 5 times to obtain the URL and then raises an exception.
def getSignedURL(fileName, folder, attempt=0, upload=True): 
   if upload:
       IDLpath = 'datalake/v3/generateUploadObjectUrls'
   else:
       IDLpath = 'datalake/v3/generateDownloadObjectUrls'
   IDLFilePath = '/%s/%s' % (folder, fileName)
   url = GATEWAY + IDLpath
   body='{"paths": [{"path": "%s"}]}' % IDLFilePath
   response = requests.post(url, headers=HEADERS, data=body)
   try:
       return json.loads(response.text)['objectUrls'][0]['signedUrl']
   except KeyError:
      if attempt < 5:
          attempt += 1
          return getSignedURL(fileName, attempt, upload)
       else:
           raise Exception('Failed to get a signed URL')   
!echo "This is a test!" >> test.txt
fileName='test.txt'
signedURL = getSignedURL(fileName, OUTPUT_FOLDER)
requests.put(signedURL, headers=HEADERS, data=fileName)

Reading data from IoT

Run the following commands to read data from IoT sources:

%pip install --upgrade pip --user
%pip install requests --force-reinstall --upgrade --user
%pip install awscli --force-reinstall --upgrade --user
%pip install pandas sklearn seaborn matplotlib joblib --user
import json
import io
import os
import datetime
import time
from dateutil import parser
import random
from threading import Thread
import requests
import pandas as pd
import tempfile
def read_iot(entity_id = "<<iot_entity_id_GUID>>",
           aspect_name = "<<aspect_name>>",
           tenant = "tenantname",
           max_results = 2000, #max is 2000
           from_dt = "2020-06-01T13:09:37.029Z", 
           to_dt = "2020-07-01T08:02:27.962Z",
           variable = "pressure",
           sort = "asc"):
    if variable is not None:
       url = "?from=" + from_dt + "&to=" + to_dt + "&sort=" + sort + "&limit=" + str(max_results) + "&select=" + variable
   else:
       url = "?from=" + from_dt + "&to=" + to_dt + "&sort=" + sort + "&limit=" + str(max_results)
   #this is the IoT Timeseries API base URL
   TSpath = 'iottimeseries/v3/timeseries'
   #this is the Predictive Gateway URL that handles authentication for your API calls
   gw = os.environ['GATEWAY_ENDPOINT'] + '/gateway/'
   headers = {
       'Content-Type': 'application/json'
   }
   iot_url = gw + TSpath + "/" + entity_id + "/" + aspect_name + url
   response = requests.get(iot_url, headers=headers)
   return response
import pandas as pd
import tempfile
start = datetime.datetime.utcnow() - datetime.timedelta(days=70)
end = start + datetime.timedelta(days=30)
response = read_iot(entity_id = "<<iot_entity_id_GUID>>",
           aspect_name = "<<aspect_name>>",
           tenant = "tenantname",
           max_results = 2000, #max is 2000
           from_dt = start.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z',
           to_dt = end.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z',
           sort = "asc",
           variable = None)
if response.status_code == 200:   
   f = tempfile.TemporaryFile()
   f.write(response.content)
   f.seek(0)
   #we read the IoT data into a Pandas DataFrame
   data = pd.read_json(f.read())
   f = tempfile.TemporaryFile()
   f.write(response.content)
   f.seek(0)
   print(data.shape)
else:
   print(response.status_code)
   print(response.content)

Last update: January 22, 2024