Jupyter Notebooks Samples¶
Using Jupyter Notebooks¶
You can use different Insights Hub APIs such as Data Exchange, Model Management, and IoT Time Series from a Jupyter Notebook, a model development workspace in PrL. You will need valid access to the Insights Hub APIs you want to use in PrL.
Jupyter Notebook can be accessed from the Manage Environments page and the information provided here will aide you in launching a Jupyter Notebook.
Note
- PrL environments do not retain the Jupyter Notebooks you have created. Ensure to save them on your local machine before you stop the environment. Clusters, on the other hand, do save new work and modifications applied to a Notebook.
- The dataset feature is going to be deprecated from all the platforms very soon. Currently, it is not available in Azure Private Cloud environment. For more information, refer to datasets
Model Validation & Job execution environments file system¶
- Users cannot create new files or folders in their Jupyter notebook and Docker container, except in a specific directory during model validation and job execution.
- The file system is locked to read-only during model validation and job execution, except for the
/tmp
directory. - Any new files or folders created during model validation and job execution should be placed in the
9*
directory only.
Checking your Configuration¶
When you begin working with your scripts, your environment will require a certain set of libraries.
Some libraries required to run the minimal services within the cluster come preinstalled. We have included links to the libraries in the Open Source Software topic in this Help.
Run these commands to examine installed packages from Jupyter:
%pip freeze
It is recommended that the required packages are installed at the beginning of the notebook and executed each time the cluster is started. The custom actions you take are not stored between stops and starts.
Installing Your Own Python Libraries¶
Please use the following whenever you need to install your libraries:
#upgrade pip and install required libraries
%pip install --upgrade pip
%pip install requests
%pip install pandas
%pip install pyarrow
Insights Hub API call from jupyter:
import os
import requests
import json
dlpath = '/datalake/v3/generateAccessToken'
gw = os.environ['GATEWAY_ENDPOINT'] + 'gateway/'
# increment_value = 1
headers = {
'Content-Type': 'application/json'
}
payload="{ \"subtenantId\":\"\" } "
dl_url = gw + dlpath
response = requests.post(dl_url, data=payload, headers=headers)
#print(response.status_code)
dl = json.loads(response.text)
os.environ["AWS_ACCESS_KEY_ID"] = dl['credentials']['accessKeyId']
os.environ["AWS_SECRET_ACCESS_KEY"] = dl['credentials']['secretAccessKey']
os.environ["AWS_SESSION_TOKEN"] = dl['credentials']['sessionToken']
Not all external library repositories are allowed. If you require additional external sources for your project, please contact your organization's Predictive Learning administrator.
When an environment stops, all libraries and modifications performed on it are lost, which requires users to run the installation paragraphs each time Jupyter imports the note. Running the installation paragraphs insures that your machine is up-to-date.
Uploading data in Integrated Data Lake¶
# upgrade pip and install required libraries
%pip install --upgrade pip
%pip install requests --force-reinstall --upgrade
%pip install pandas --force-reinstall --upgrade
%pip install pyarrow --force-reinstall --upgrade
import datetime
import requests
import os
import json
import re
HEADERS = {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Content-Type': 'application/json'
}
GATEWAY = os.environ['GATEWAY_ENDPOINT'] + '/gateway/'
OUTPUT_FOLDER = 'OUTPUT_FOLDER'
#Get a signed URL for down/upload of data. The function
#attempts for 5 times to obtain the URL and then raises an exception.
def getSignedURL(fileName, folder, attempt=0, upload=True):
if upload:
IDLpath = 'datalake/v3/generateUploadObjectUrls'
else:
IDLpath = 'datalake/v3/generateDownloadObjectUrls'
IDLFilePath = '/%s/%s' % (folder, fileName)
url = GATEWAY + IDLpath
body='{"paths": [{"path": "%s"}]}' % IDLFilePath
response = requests.post(url, headers=HEADERS, data=body)
try:
return json.loads(response.text)['objectUrls'][0]['signedUrl']
except KeyError:
if attempt < 5:
attempt += 1
return getSignedURL(fileName, attempt, upload)
else:
raise Exception('Failed to get a signed URL')
!echo "This is a test!" >> test.txt
fileName='test.txt'
signedURL = getSignedURL(fileName, OUTPUT_FOLDER)
requests.put(signedURL, headers=HEADERS, data=fileName)
key_id = os.environ["CLIENT_ID"]
key_secret = os.environ["CLIENT_SECRET"]
tenant = os.environ["tenant"]
subTenant = os.environ.get("subtenant")
authServer = os.environ["TOKEN_ENDPOINT"]
coreGateway= os.environ["CORE_GATEWAY"]
class JsonWebToken():
# Handles getting JWT from environment controller using HMAC.
# Refreshes token whenever it gets expired.
def __init__(self):
self.token = self.__refreshToken()
def __refreshToken(self):
#Gets new JWT token using HMAC
sign, timestamp = self.__getHmacSignature()
headers = {
'Authorization': 'HMAC-SHA256 Credential=' + key_id + '&SignedHeaders=host;x-msg-timestamp&Signature=' + sign,
'x-msg-timestamp': timestamp
}
response = requests.get(authServer, headers=headers, verify=False)
print(response.status_code)
return json.loads(response.text)["access_token"]
def __getHmacSignature(self):
# Creates HMAC Signature for getting JWT
payload, timestamp = self.__getPayload()
sign = hmac.new(key_secret.encode(), payload.encode(), hashlib.sha256).hexdigest()
return sign, timestamp
def __isExpired(self):
# Checks whether existing JWT is expired or not.
decoded = jwt.decode(self.token, verify=False, options={'verify_signature': False})
current = int(datetime.now().timestamp()) + 10
if decoded['exp'] <= current:
return True
return False
def getToken(self):
# Gets token if expired. This method should be used by users of this class.
if self.__isExpired():
print("token is expired. getting new one...")
self.__refreshToken()
return self.token
def __getPayload(self):
timestamp = datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')
payload = "GET\n" + urlparse(authServer).netloc + "\n" + timestamp + "\n" + tenant
print(payload)
if subTenant != None:
payload = payload + "\n" + subTenant
return payload, timestamp
jwtobj = JsonWebToken()
# Generate Upload URL
# create random string
random_string = ''.join(random.choices(string.ascii_lowercase, k = 10))
print(random_string)
filename = 'readme.json'
path = random_string + "/" + filename
body = json.dumps({
"paths": [
{
"path": path
}
],
"subtenantId": "",
"isMicrosoftRoutingEnabled": False
})
with open(filename, 'w') as f:
random_content = "{ 'random_string': '"+ ''.join(random.choices(string.ascii_lowercase, k = 50)) + "' }"
f.write(random_content)
upload_url = coreGateway + "/api/datalake/v3/generateUploadObjectUrls"
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'Bearer ' + jwtobj.getToken()
}
response = requests.post(upload_url, data=body, headers=headers)
print("response code: " + str(response.status_code))
print(response.content)
signedUploadUrl = json.loads(response.content)['objectUrls'][0]['signedUrl']
print("Signed Upload URL: " + signedUploadUrl)
Reading data from IoT¶
Run the following commands to read data from IoT sources:
%pip install --upgrade pip
%pip install requests --force-reinstall --upgrade
%pip install awscli --force-reinstall --upgrade
%pip install pandas sklearn seaborn matplotlib joblib
import json
import io
import os
import datetime
import time
from dateutil import parser
import random
from threading import Thread
import requests
import pandas as pd
import tempfile
def read_iot(entity_id = "<<iot_entity_id_GUID>>",
aspect_name = "<<aspect_name>>",
tenant = "tenantname",
max_results = 2000, #max is 2000
from_dt = "2020-06-01T13:09:37.029Z",
to_dt = "2020-07-01T08:02:27.962Z",
variable = "pressure",
sort = "asc"):
if variable is not None:
url = "?from=" + from_dt + "&to=" + to_dt + "&sort=" + sort + "&limit=" + str(max_results) + "&select=" + variable
else:
url = "?from=" + from_dt + "&to=" + to_dt + "&sort=" + sort + "&limit=" + str(max_results)
#this is the IoT Timeseries API base URL
TSpath = 'iottimeseries/v3/timeseries'
#this is the Predictive Gateway URL that handles authentication for your API calls
gw = os.environ['GATEWAY_ENDPOINT'] + '/gateway/'
headers = {
'Content-Type': 'application/json'
}
iot_url = gw + TSpath + "/" + entity_id + "/" + aspect_name + url
response = requests.get(iot_url, headers=headers)
return response
import pandas as pd
import tempfile
start = datetime.datetime.utcnow() - datetime.timedelta(days=70)
end = start + datetime.timedelta(days=30)
response = read_iot(entity_id = "<<iot_entity_id_GUID>>",
aspect_name = "<<aspect_name>>",
tenant = "tenantname",
max_results = 2000, #max is 2000
from_dt = start.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z',
to_dt = end.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z',
sort = "asc",
variable = None)
if response.status_code == 200:
f = tempfile.TemporaryFile()
f.write(response.content)
f.seek(0)
#we read the IoT data into a Pandas DataFrame
data = pd.read_json(f.read())
f = tempfile.TemporaryFile()
f.write(response.content)
f.seek(0)
print(data.shape)
else:
print(response.status_code)
print(response.content)
Using Inputs from Job Execution¶
All job executions require parameters, and you can use any of the following:
- IoT (Internet of Things)
- IDL (Integrated Data Lake)
Using the first three from the list above ensures Job Manager copies the input to a temporary location available to your code. In Jupyter notebooks, there are three variables available:
inputFolder
outputFolder
datasetName
You can employ the Jupyter magic command %store, as follows:
%store -r inputFolder #-r specifies a read %store -r outputFolder %store -r datasetName
The datasetName variable will only contain a value when the IoT input type is being used. The inputFolder will be prefilled by the job execution engine with a value pointing to the temporary location that holds the input files or data. That will be an S3 path on AWS or a blob storage on Azure. It does not contain the associated prefix like s3://. You can then use the outputFolder variable in a Jupyter notebook as in:
!aws s3 cp /tmp/mylocalfile.txt s3://$outputFolder+'/myfile.txt'
!pip install azure_cli
%store -r inputFolder
%store -r outputFolder
input = inputFolder
output = outputFolder
!az storage blob download --account-name prlstorageanls -c jobmanager -f data.csv -n $input'/data.csv'
!az storage blob upload --account-name prlstorageanls -c jobmanager -f data.csv -n $output'/output.csv'
inputPath='s3://' + os.environ.get('inputFolder') + '/'
outputPath='s3://' + os.environ.get('outputFolder') + '/'
endpoint=os.environ.get('AWS_ENDPOINT_URL') + '/'
# Create the directory
directory_path = '/tmp/upload_dir'
os.makedirs(directory_path, exist_ok=True)
os.chmod(directory_path, stat.S_IRWXU)
!aws s3 cp $inputPath /tmp/upload_dir --recursive --endpoint-url $endpoint
!aws s3 cp /tmp/upload_dir $outputPath --recursive --endpoint-url $endpoint
For Jupyter notebooks, we do not provide a built-in library for loading the dataset, however, there are various ways to achieve this by using Python. If you encounter any issues in loading your dataset, feel free to contact us for guidance.
Note
Both inputFolder and outputFolder variables are remote storage paths, not local folders; therefore most of the commonly-used file functions do not work against it; however, CLI and shell commands will use them as long as they use the correct prefix. For Python or Scala libraries that can work with remote storage services, we recommend checking the documentation for each respective library; for example, the pandas Python library is able to save and read files from AWS S3 storage.
Using exported IoT Datasets¶
We prepare the environments that you start with our prlutils
library that handles reading of parquet dataset files into a Pandas Dataframe. Please use the bellow snippet to show the available datasets and then load a single dataset.
from prlutils import datasetutils
import boto3
import os
import json
import s3fs
du = datasetutils.DatasetUtils()
datasetnames = du.get_dataset_names()
print('Dataset names: ' + str(datasetnames))
#ds.shape
You will get a list of datasets like in:
['test_asset_2',
'Last30DaysAsset2Filtered',
'Last30DaysAsset2']
You can load the dataset you want with the:
ds = du.load_dataset_by_name(datasetnames[0])
ds
and check its data immediately:
Then, you can use your dataset just like with any other Pandas Dataframe:
filteredDataset = ds[ds['temp']>60]
print("Number of entries AFTER filtering: "+ str(filteredDataset.shape))
try:
path = "s3://"+outputFolder
filteredDataset.write.csv(path)
filteredDataset.write.csv('s3://prl-storage-216273414971/prlteam/data/')
except:
print('Output folder is None.')
else:
print('Filtered dataset written to outputFolder' + outputFolder)
If you encounter issues with using our libray, make sure that the libraries used by our Datasets utility does not conflict with your previously installed Python libraries. Our utility library uses the following:
%pip install pyarrow fastparquet fss pec s3fs boto3 awscli
More About Jupyter Notebook¶
Jupyter is a powerful tool that allows multiple customizations and languages.These resources can help you explore further:
https://jupyter.org/documentation
https://jupyter-notebook.readthedocs.io/en/stable/
https://ipython.readthedocs.io/en/stable/interactive/magics.html
Except where otherwise noted, content on this site is licensed under the Development License Agreement.