Capturing Results from Edge Streaming Analytics in Industrial IoT¶
This section describes, how to capture results from Edge Streaming Analytics (ESA) as a time series in Industrial IoT storage, with the help of the MindConnect-NodeJS library.
In this section, you will learn how to install and configure the MindConnect-NodeJS library and to create an Edge Streaming Analytics project that can read, process, and send time series data to Insights Hub.
The MindConnect-NodeJS library is a stand-alone library that can be installed on a device and acts as an interface to Insights Hub.
This section contains two sub-sections:
-
Capturing Results from a Physical Device: The first sub-section describes, how to "Capturing Results from a Physical Device" which is appropriate for a production device.
-
Capturing Results from a Virtual Device: The second sub-section describes, how to "Capturing Results from a Virtual Device" which is appropriate for use during development of a solution.
Capturing Results from a Physical Device¶
In this section, you will learn how to install and configure the MindConnect-NodeJS library on a physical device and create an Edge Streaming Analytics project that can send time series data to Insights Hub.
The MindConnect-NodeJS library is used to create a custom MindConnect agent that enables the Edge Streaming Server to access additional Industrial IoT services. The Event Streaming Server communicates with the MindConnect agent through a local MQTT message broker.
Note
This section assumes that Edge Streaming Analytics is already installed and set up on a physical or virtual device.
Prerequisites¶
Hardware requirements:
- 6GB RAM available
- Dual or quad core CPU, at least 1GHz
- 8GB free disk space
Software requirements:
- Red Hat Enterprise Linux 7
- Root privileges (sudo)
- Java 8
Installing an MQTT Broker¶
Communications between the Edge Streaming Server and the MindConnect agent are made using a subscriber connector in the Edge Streaming Server and a compatible subscriber in the agent.
In this example, you will use the MQTT subscriber connector to output events from the model and to implement an MQTT subscriber in the agent. This example uses "Mosquitto", which is a lightweight implementation of an MQTT broker.
Note
Depending on your device's configuration, the commands might need to prefixed with sudo
.
-
Enable the repository, where the Mosquitto is installed.
yum-config-manager --add-repo=http://download-ib01.fedoraproject.org/pub/epel/7/x86_64/
-
Import the repository's key in Red Hat Enterprise Linux.
wget https://download-ib01.fedoraproject.org/pub/epel/RPM-GPG-KEY-EPEL-7 rpm --import RPM-GPG-KEY-EPEL-7
-
Install Mosquitto.
yum install mosquito
-
Start Mosquitto.
service mosquitto start
-
In order to test if Mosquitto has started, execute the following command:
netstat -tulpn
The output should show the Mosquitto broker listening on port 1883:
Installing the MindConnect-NodeJs Library¶
-
The MindConnect-NodeJS library requires NodeJS to run. Use the following command to install NodeJS:
sudo yum install nodejs
-
The MindConnect-NodeJS library can then be installed using the Node Package Manager (npm):
sudo npm install -g @mindconnect/mindconnect-nodejs
-
Create a starter project by navigating to the Edge Streaming Analytics directory (
/opt/mdsp/esa
) and create a new MindConnect-NodeJS project with the following command:mc starter-ts --dir /opt/mdsp/esa/mc-agent
This creates an example agent that can be used as a starting point for developing a new custom agent.
-
Install NodeJS support for MQTT:
sudo npm install -g mqtt
-
Navigate to the agent directory,
/opt/mdsp/esa/mc-agent
, and install the MindConnect-NodeJS library dependencies with the following command:sudo npm install
-
Replace the auto-generated
index.ts
file with the code below:Code
import { DataSource, DataPointValue, MindConnectAgent, retry } from "@mindconnect/mindconnect-nodejs"; (async function () { const configFile = "/opt/mdsp/esa/mc-agent/mc-config.json"; // Number of times to retry the operation before giving up and throwing an exception const RETRYTIMES = 5; // Helper methods const sleep = (ms: any) => new Promise(resolve => setTimeout(resolve, ms)); const prependDate = (text: any) => { return `[${new Date().toISOString()}] ${text.toString()}`}; const log = (text: any) => { console.log(prependDate(text)); }; const error = (text: any, err?: any) => { console.error(prependDate(text)); err && console.error(err); }; // Connect to MQTT const mqtt = require('mqtt'); const mqttClient = mqtt.connect('mqtt://localhost:1883'); // Method to send data to a specific data source // The data object should contain properties that match the datapoint names let sendData = (dataSource: any, data: any) => { let dataPointValues : DataPointValue[] = []; dataSource.dataPoints.forEach((dataPoint: any) => { if (data.hasOwnProperty(dataPoint.name)) { dataPointValues.push({ dataPointId: dataPoint.id, qualityCode: "0", value: "" + data[dataPoint.name] }); } }); if (data.Timestamp) { const timestamp = new Date(data.Timestamp/1000); agent.PostData(dataPointValues, timestamp) .then(()=>{ log("Data posted with timestamp: (" + timestamp + ")"); }) .catch(err => { error("Error posting data.", err); }); } else { agent.PostData(dataPointValues) .then(() => { log("Data posted successfully."); }) .catch((err: any) => { error("Error posting data.", err); }); } }; // Set agent configuration const configuration = require(configFile); const agent = new MindConnectAgent(configuration); // Onboard the agent if (agent.IsOnBoarded()) { log("The agent has previously been onboarded"); } else { try { // Attempt to onboard the agent await retry(RETRYTIMES, () => agent.OnBoard()); log("The agent has been onboarded"); } catch (err) { error("The agent could not be onboarded.", err); } } // Renew the agent token periodically setInterval(async () => { try { await retry(5, () => agent.RenewToken()); } catch(err) { error("Error renewing agent token.", err); } }, 3600000); // Get the data sources that have been configured for the agent const dataSourceConfig = await agent.GetDataSourceConfiguration(); let dataSources: any = {}; mqttClient.on('message', function (topic: string, message: any) { try { // message is Buffer console.log("Incoming mqtt message: " + message.toString()); if (dataSources[topic]) { const outerArray = JSON.parse(message); // unwrap outer array outerArray.forEach((innerArray : any) => { // unwrap inner array innerArray.forEach((evt: any) => { sendData(dataSources[topic], evt); }); }); } else { error("Message received for unrecognized data source: " + topic); } } catch(err) { error("Error handling incoming MQTT message.", err); } }); dataSourceConfig.dataSources.forEach((dataSource: DataSource) => { const topic = dataSource.name; dataSources[dataSource.name] = dataSource; mqttClient.subscribe(topic, function (err: any) { if (err) { error("Error subscribing to topic: " + topic, err); } else { log("Subscribed to MQTT topic: " + topic); } }); }); })();
This code provides a simple custom agent that listens for events from an MQTT source. For each data source associated with the agent, a subscription is added to a topic with the same name as the data source. In this way, data can be routed through the agent to multiple data sources.
For the first time if the agent runs, it on-boards the agent and then stores the credentials required to access Insights Hub, refreshing these credentials regularly.
Note
For a production agent, it is recommended that additional steps are taken to ensure that the credentials are stored securely, which might require further development to the agent depending on the intended application and edge device configuration. This example provides a reference for consumption of data from an Edge Streaming Server, through MQTT which can be extended and modified for more specific use cases. For more information on developing custom agents with the MindConnect-NodeJS library, refer to the MindConnect-NodeJS documentation .
The agent is connected to a Insights Hub asset, so before the code can be executed you need to create a suitable agent asset using the Asset Manager.
Creating a MindConnect Agent¶
For the data to be visualized in Insights Hub, you need to create a data aspect to define the structure of the data points and create an asset that can be used to bring the data from the edge device into Insights Hub.
Note
Creating an agent requires that the user has permission to access the Asset Manager in the environment account.
-
On Launchpad, click Asset Manager icon as shown below.
-
Select Aspect Types from the Library context menu as shown below, and create a new aspect called
ExampleAspect
. -
Select the Dynamic category option, which enables the aspect to be used with IoT Time-Series.
-
Name the aspect
ExampleAspect
and add two variables,Temperature
andHumidity
, as shown below: -
Save the aspect, and navigate to Asset Types, as shown below:
-
Expand the Basic Agent section and select MindConnectLib and then click Create Type to create a new MindConnectLib asset. Verify that the parent type is
core.mclib
. -
Name the asset
ExampleMCLAsset
. -
Click Add aspect to add an additional aspect to the asset. The aspect created in step 4 has the id
<tenant>.ExampleAspect
wheretenant
is the name of your environment. Select this aspect and save. For example, with the environmentsas
, the aspect issas.ExampleAspect
: -
Return to Assets page and select Create asset to create an asset for your edge device.
-
On Select Type page, select your newly created asset type ExampleMCLAsset and click Create button.
-
Name the asset
ExampleAsset1
and save. A new asset is created with MindConnectLib connectivity, and the ExampleAspect data aspect:Note that the agent shows as offline as you have not yet on-boarded it, and the aspect shows that no data has yet been received.
-
Click
to open MindConnectLib plugin. Configure the asset to use
SHARED_SECRET
and save. -
Click Generate onboarding key to create a token for on-boarding the edge device.
-
Copy the configuration and save it in a file on the edge device. In this example, the file should be saved in a file called
mc-config.json
. If deploying the agent on a physical device, the file should be stored in the directory on the device where you created the MindConnect agent:/opt/mdsp/esa/mc-agent
. If deploying the agent on a virtual device, the file should be stored in the local directory for the virtual device. Example:c:\ved1\
or~/ved1
. -
Return to MindConnectPlugin page and click Configure your data sources:
-
Click Add data source:
-
Enter the data source name,
Example_Data_Source
and click Accept: -
Add two data points,
Temperature
andHumidity
, entering the required details for the data points as shown below and click Accept:The MindConnect Lib configuration should now appear, as shown below and click Save:
-
Navigate to Data mappings tab:
-
Click Link variable and link Temperature data source field to the aspect type’s Temperature field:
-
Repeat for the
Humidity
data point, so that both data points are linked:The asset is now ready to be onboarded to receive data and to map incoming data to the specified aspect type.
Sending Data through the Edge Streaming Creator Project¶
-
Start the MindConnect agent by navigating to the
/opt/mdsp/esa/mc-agent
directory and executing the following command:npm start
The agent starts up. When the agent is running, it eventually generates the following output that indicates that the agent has been onboarded successfully, the data source has been detected and an MQTT subscription has been made accordingly:
[2022-01-26T08:59:46.603Z] The agent has been onboarded [2022-01-26T08:59:47.178Z] Subscribed to MQTT topic: Example_Data_Source
-
Save the data below into a suitable location, such as
/opt/mdsp/esa/data/input/sample.csv
. The data provides sample temperature and humidity readings that are compatible with your data source.Timestamp,Temperature,Humidity 2021-10-24T16:15:00.000Z,40,80 2021-10-24T16:16:00.000Z,25,79 2021-10-24T16:17:00.000Z,27,75 2021-10-24T16:18:00.000Z,30,72 2021-10-24T16:19:00.000Z,45,70 2021-10-24T16:20:00.000Z,44,72 2021-10-24T16:21:00.000Z,43,74 2021-10-24T16:22:00.000Z,40,79 2021-10-24T16:23:00.000Z,36,80 2021-10-24T16:24:00.000Z,30,81
When developing an Edge Streaming Creator project, it is usual practice to begin development with sample data stored in a file before connecting to a live data source, such as sensor data. This provides repeatability as the project is developed and fine-tuned.
In this example, the source data matches the aspect type in Insights Hub but this is not a requirement as it is the output of the project and not the input that is sent to Insights Hub. It is likely that the project performs some manipulation of the data before it is sent to the cloud, such as performing edge analytics on the streaming data.
-
Save the following XML code as
MindConnectExample.xml
:Code
<project name="MindConnectExample" threads="1" pubsub="auto" heartbeat-interval="1"> <contqueries> <contquery name="cq1"> <windows> <window-source pubsub="true" index="pi_EMPTY" name="ReadSensorData"> <schema> <fields> <field name="Timestamp" type="stamp" key="true"/> <field name="Temperature" type="double"/> <field name="Humidity" type="double"/> </fields> </schema> <connectors> <connector class="fs" name="SampleDataConnector"> <properties> <property name="type"><![CDATA[pub]]></property> <property name="dateformat"><![CDATA[%Y-%m-%dT%H:%M:%S]]></property> <property name="rate"><![CDATA[100]]></property> <property name="header"><![CDATA[1]]></property> <property name="addcsvopcode"><![CDATA[true]]></property> <property name="addcsvflags"><![CDATA[normal]]></property> <property name="fsname"><![CDATA[/opt/mdsp/esa/data/input/sample.csv]]></property> <property name="fstype"><![CDATA[csv]]></property> </properties> </connector> </connectors> </window-source> <window-compute pubsub="true" name="ProcessData"> <schema> <fields> <field name="Timestamp" type="stamp" key="true"/> <field name="Temperature" type="double"/> <field name="Humidity" type="double"/> </fields> </schema> <output> <field-expr><![CDATA[Temperature]]></field-expr> <field-expr><![CDATA[Humidity]]></field-expr> </output> <connectors> <connector class="mqtt" name="MqttConnector"> <properties> <property name="type"><![CDATA[sub]]></property> <property name="snapshot"><![CDATA[false]]></property> <property name="mqtthost"><![CDATA[localhost]]></property> <property name="mqtttopic"><![CDATA[Example_Data_Source]]></property> <property name="mqttqos"><![CDATA[0]]></property> <property name="mqttmsgtype"><![CDATA[json]]></property> </properties> </connector> </connectors> </window-compute> </windows> <edges> <edge source="ReadSensorData" target="ProcessData"/> </edges> </contquery> </contqueries> </project>
This project uses a Source window to read the sample data from the CSV file and feeds the data into a Compute window that provides the output data using an MQTT connector. In this case, the Compute window simply copies the data from the Source window, but could be used to perform further processing of the data if required.
-
Upload the XML file into the Edge Streaming Creator application, and open the project in the editor:
-
Confirm that everything is configured correctly by entering test mode and running the project. The following output should be observed:
-
The output can be tested by running the MindConnect agent, which subscribes to the data source topic on the MQTT broker, and running the project in test mode. For example, the following output from the agent would be observed:
The messages can also be viewed by using the MQTT Command Line Interface to subscribe to the relevant topics, which can be useful when debugging.
-
Open Operation Insights application and select your asset ExampleAsset1. Select the check boxes next to the Humidity and Temperature data points and finally set the date to the date used in the sample data, 24th October, 2021. The data that was just posted and sent to IoT Time Series should now be displayed:
You now have a data source that is being processed by the Edge Streaming Server, output to a MindConnect agent via MQTT, and posted to Insights Hub to be stored as IoT Time Series data.
Capturing Results from a Virtual Device¶
When developing an edge application it might not be desirable nor practical to develop the analytical model directly on an edge device. Through the use of containerization one or more virtual edge devices can be created and deployed for use during development. Using a local image allows the local file system to be mounted, which simplifies the process of testing with sample data files. A virtual device deployed on a server can be maintained and shared for demonstration purposes.
In this section, you will learn how to deploy a virtual edge device using Docker, install and configure the MindConnect-NodeJS library and create an Edge Streaming Analytics project that can send Time Series data.
The MindConnect-NodeJS library is used to create a custom MindConnect agent that enables the Edge Streaming Server to access additional Industrial IoT services. The Event Streaming Server communicates with the MindConnect agent through a local MQTT message broker.
Prerequisites¶
Software Requirements:
- Docker
Creating a Virtual Edge Device Image¶
-
Create a working directory for creating the Docker image. Example:
C:\virtual-edge-device
or~/virtual-edge-device
. -
Copy the following file into the working directory, naming the file
Dockerfile
:FROM quay.io/centos/centos:stream8 ENV container docker ARG EDGE_VERSION=1.6.4 RUN yum update -y # Mosquitto and Node.js require extra packages that are available from the epel-release repository RUN yum install -y epel-release # Useful utilities RUN yum install -y unzip RUN yum install -y nano # Dependencies required by the Edge Streaming Server RUN yum install -y numactl RUN yum install -y compat-openssl10 RUN yum install -y libnsl # Dependencies needed to run the Edge Streaming Analytics agent RUN yum install -y java-11 ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.12.0.7-0.el7_9.x86_64/ # Mosquitto is used to enable communications between the Edge Streaming server and the MindConnect agent RUN yum install -y mosquitto # This section creates directories needed for the Edge Streaming agents RUN mkdir /opt/mdsp RUN mkdir /opt/mdsp/esa # This section copies and unzips the Edge Streaming Server and agent COPY ./SiemensEdgeStreamingServer-v${EDGE_VERSION}.zip /tmp RUN unzip /tmp/SiemensEdgeStreamingServer-v${EDGE_VERSION}.zip -d /opt/mdsp/esa/ # This step fixes the encryption key used by the Edge Streaming agent to enable the docker image to be stopped and started. The secret should be changed to a unique value. ENV esp_edge_secret=xyz987 ENV esp_edge_secret_custom=true # This section sets environment variables required by the Edge Streaming server ENV ESATK=/opt/mdsp/esa/home/ESA/esaexe ENV TKPATH=${ESATK}:${ESATK}/../utilities/bin ENV DFESP_HOME=/opt/mdsp/esa/home/EdgeStreamingServer/v${EDGE_VERSION} ENV DFESP_CONFIG=/opt/mdsp/esa/config/etc/EdgeStreamingServer/default # This section installs the MindConnect-NodeJS library and creates a starter project RUN yum install -y nodejs RUN npm install -g @mindconnect/mindconnect-nodejs RUN npx @mindconnect/mindconnect-nodejs starter-ts --dir /opt/mdsp/esa/mc-agent &&\ cd /opt/mdsp/esa/mc-agent &&\ npm install --save mqtt &&\ npm install # This section copies the local MindConnectNodeJS index.ts file into the image COPY ./index.ts /opt/mdsp/esa/mc-agent/index.ts # This section copies the local run script and removes any \r escape characters that might be added if the file is edited in a Windows Operating System COPY ./run.sh /opt/mdsp/esa/run.sh RUN sed -i "s|\r||g" /opt/mdsp/esa/run.sh # This section sets the run script as the entrypoint for the container ENTRYPOINT ["/opt/mdsp/esa/run.sh"]
The Docker file starts with a CentOS base image.
Note
CentOS is a clone of Red Hat Enterprise Linux (RHEL) that can be accessed for free. You might wish to replace the base image, for example to use a hardened RHEL image that is approved for use within your company or organization.
The Docker file also installs additional packages that are required to run the Edge Streaming Server, Edge Streaming Analytics agent and the MindConnect-NodeJS library.
Furthermore, the Docker file sets required environment variables and copies required assets from the local file system. These are described in more detail in the following steps.
When running the Edge Streaming Analytics agent, the credentials are stored in an encrypted file on the device. When using a Docker image, the encryption key changes each time the Docker image is started, so the following environment variables override the default behavior to use the same encryption key each time a container is created:
ENV esp_edge_secret=xyz987 ENV esp_edge_secret_custom=true
Whilst the value of the secret can be changed, it is recommended that this setting is used only in development environments.
-
Copy the latest Edge Streaming Analytics distribution from Siemens Industry Online Support, for example
SiemensEdgeStreamingServer-v1.6.4.zip
and save in the same directory as the Docker file. Edit the Docker file argumentEDGE_VERSION
to match the version number, for example:ARG EDGE_VERSION=1.6.4
-
Save the following code with the filename
index.ts
, into the same directory as the Docker file:Code
import { DataSource, DataPointValue, MindConnectAgent, retry } from "@mindconnect/mindconnect-nodejs"; (async function () { const configFile = "/host/mc-config.json"; // Number of times to retry the operation before giving up and throwing an exception const RETRYTIMES = 5; // Helper methods const sleep = (ms: any) => new Promise(resolve => setTimeout(resolve, ms)); const prependDate = (text: any) => { return `[${new Date().toISOString()}] ${text.toString()}`}; const log = (text: any) => { console.log(prependDate(text)); }; const error = (text: any, err?: any) => { console.error(prependDate(text)); err && console.error(err); }; // Connect to MQTT const mqtt = require('mqtt'); const mqttClient = mqtt.connect('mqtt://localhost:1883'); // Method to send data to a specific data source // The data object should contain properties that match the datapoint names let sendData = (dataSource: any, data: any) => { let dataPointValues : DataPointValue[] = []; dataSource.dataPoints.forEach((dataPoint: any) => { if (data.hasOwnProperty(dataPoint.name)) { dataPointValues.push({ dataPointId: dataPoint.id, qualityCode: "0", value: "" + data[dataPoint.name] }); } }); if (data.Timestamp) { const timestamp = new Date(data.Timestamp/1000); agent.PostData(dataPointValues, timestamp) .then(()=>{ log("Data posted with timestamp: (" + timestamp + ")"); }) .catch(err => { error("Error posting data.", err); }); } else { agent.PostData(dataPointValues) .then(() => { log("Data posted successfully."); }) .catch((err: any) => { error("Error posting data.", err); }); } }; // Set agent configuration const configuration = require(configFile); const agent = new MindConnectAgent(configuration, 600, "/host"); // Onboard the agent if (agent.IsOnBoarded()) { log("The agent has previously been onboarded"); } else { try { // Attempt to onboard the agent await retry(RETRYTIMES, () => agent.OnBoard()); log("The agent has been onboarded"); } catch (err) { error("The agent could not be onboarded.", err); } } // Renew the agent token periodically setInterval(async () => { try { await retry(5, () => agent.RenewToken()); } catch(err) { error("Error renewing agent token.", err); } }, 3600000); // Get the data sources that have been configured for the agent const dataSourceConfig = await agent.GetDataSourceConfiguration(); let dataSources: any = {}; mqttClient.on('message', function (topic: string, message: any) { try { // message is Buffer console.log("Incoming mqtt message: " + message.toString()); if (dataSources[topic]) { const outerArray = JSON.parse(message); // unwrap outer array outerArray.forEach((innerArray : any) => { // unwrap inner array innerArray.forEach((evt: any) => { sendData(dataSources[topic], evt); }); }); } else { error("Message received for unrecognized data source: " + topic); } } catch(err) { error("Error handling incoming MQTT message.", err); } }); dataSourceConfig.dataSources.forEach((dataSource: DataSource) => { const topic = dataSource.name; dataSources[dataSource.name] = dataSource; mqttClient.subscribe(topic, function (err: any) { if (err) { error("Error subscribing to topic: " + topic, err); } else { log("Subscribed to MQTT topic: " + topic); } }); }); })();
This is the MindConnect agent that was used previously with two modifications. The configuration file is set to look in the directory
/host
and the agent is instructed to store on-boarded credentials in the same directory.const configFile = "/host/mc-config.json"; ... const agent = new MindConnectAgent(configuration, 600, "/host");
The use of a mounted local directory allows the container to be stopped and restarted without having to re-onboard the agents.
This provides a convenient mechanism for sharing files with the running container via the local file system.Note
The MindConnect agent stores the credentials in an unencrypted file, so these changes should be used only in a development environment.
-
Save the following code in a file called
run.sh
in the same directory as the Docker file:#!/bin/bash # This section waits for the local directory to be mounted WAITING=1 while [ $WAITING -eq 1 ] do echo "Waiting for /host directory to be mounted..." if [ -d "/host" ]; then echo "Directory /host mounted." ((WAITING = 0)) fi sleep 1 done # This section starts Mosquitto echo "Starting Mosquitto" startMosquitto="mosquitto -d"; eval "${startMosquitto}" & disown; # THis section starts the Edge Streaming agent echo "Starting Edge Streaming agent" startEdge="bash /opt/mdsp/esa/startup.sh --esa-onboarding-config-file /host/edge-config.json > /host/esa.log 2>/host/esa.errors" eval "${startEdge}" & disown # This section starts the MindConnect agent and logs output to files named mc.log and mc.errors echo "Starting MindConnect Agent" cd /opt/mdsp/esa/mc-agent npm start > /host/mc.log 2>/host/mc.errors
This script waits for a local directory to be mounted internally as
\host
. When the mounted directory is appeared, the script starts the Mosquitto broker, the Edge Streaming Analytics agent, and the MindConnect agent. The Edge Streaming Analytics agent looks for an on-boarding configuration fileedge-config.json
in the mounted directory and saves the log output into the mounted directory. -
Build the Docker file by running the following command from within the working directory where you have saved the Docker file and assets:
docker build . -t virtual-edge-device
In this example, a build image is tagged with a friendly name
virtual-edge-device
which can be used in place of the generated image identifier.Note
You might need to use a tag that is based on a specific repository, such as
docker.yourcompany.com/virtual-edge-device
, depending on how your Docker installation is configured.
Deploying a Virtual Edge Device Container¶
-
Create a new directory to store assets for a specific virtual edge device instance. Example:
C:\ved1
or~/ved1
. -
Open Asset Manager in a web browser and create a MindConnectLib asset, such as
VirtualEdgeDevice1
. -
Generate a
SHARED_SECRET
boarding configuration. -
Save the on-boarding key as
edge-config.json
in the directory you previously created, such asC:\ved1\edge-config.json
. -
Follow the steps in Creating a MindConnect Agent to create an asset that can be used by the MindConnect-NodeJS library to communicate with Insights Hub and which has the correct aspect bindings to correctly map incoming time series data.
-
Run the following command to deploy a container using the image that you created previously:
docker run -d --rm --name ved1 --mount type=bind,source=/c/ved1,target=/host virtual-edge-device
This command names the container
ved1
, in detached mode, and mounts the local directory (for example/c/ved1
) to an internal directory called/host
, using the imagevirtual-edge-device
. As therun.sh
bash script executes it outputs the following, which can be observed by viewing the container logs:$ docker logs ved1 Waiting for /host directory to be mounted... Directory /host mounted. Starting Mosquitto Starting Edge Streaming Agent Starting MindConnect Agent
If the container is being run for the first time, then the file
mc.log
contains the following log output:> mc-agent-ts@1.0.0 start /opt/mdsp/esa/mc-agent > tsc && node index.js [2022-04-29T12:53:56.552Z] The agent has been onboarded [2022-04-29T12:53:57.096Z] Subscribed to MQTT topic: Example_Data_Source
If the container is being re-run, then the file contains the following log output:
> mc-agent-ts@1.0.0 start /opt/mdsp/esa/mc-agent > tsc && node index.js [2022-04-29T12:57:10.068Z] The agent has previously been onboarded [2022-04-29T12:57:10.830Z] Subscribed to MQTT topic: Example_Data_Source
The local directory now contains six additional files:
<asset_id>.json
, whereasset_id
is the unique identifier for the MindConnect agent, which stores the onboarded credentials for the agent.registration.txt
, which is an encrypted file containing the on-boarded credentials for the Edge Streaming Analytics agent.esa.log
andesa.errors
, which contain the log and error output for the Edge Streaming Analytics agent.mc.log
andmc.errors
, which contain the log and error output for the MindConnect agent. When the Edge Streaming Analytics agent has connected successfully to the cloud, an entry similar to the following are observed in theesa.log
file:
2022-02-22 09:56:59.334 INFO 20 --- [pool-1-thread-1] c.s.e.s.e.w.c.EdgeToCloudWebsocketClient : Connected to Cloud proxy: <asset_id>
Running a Streaming Data Project Using Edge Streaming Creator¶
-
Save the data below into a suitable directory within the edge device directory, such as
C:\ved1\data\input\sample.csv
. The data provides sample temperature and humidity readings that are compatible with your data source.Timestamp,Temperature,Humidity 2021-10-24T16:15:00.000Z,40,80 2021-10-24T16:16:00.000Z,25,79 2021-10-24T16:17:00.000Z,27,75 2021-10-24T16:18:00.000Z,30,72 2021-10-24T16:19:00.000Z,45,70 2021-10-24T16:20:00.000Z,44,72 2021-10-24T16:21:00.000Z,43,74 2021-10-24T16:22:00.000Z,40,79 2021-10-24T16:23:00.000Z,36,80 2021-10-24T16:24:00.000Z,30,81
When developing an Edge Streaming Creator project, it is usual practice to begin development with sample data stored in a file, before connecting to a live data source, such as sensor data. This provides repeatability as the project is developed and fine-tuned.
In this example, the source data matches the aspect type in Insights Hub but this is not a requirement as it is the output of the project and not the input that is sent to Insights Hub. It is likely that the project performs some manipulation of the data before it is sent to the cloud, such as performing edge analytics on the streaming data.
-
Save the following code in a file called
MindConnectExample.xml
:Code
<project name="MindConnectExample" threads="1" pubsub="auto" heartbeat-interval="1"> <contqueries> <contquery name="cq1"> <windows> <window-source pubsub="true" index="pi_EMPTY" name="ReadSensorData"> <schema> <fields> <field name="Timestamp" type="stamp" key="true"/> <field name="Temperature" type="double"/> <field name="Humidity" type="double"/> </fields> </schema> <connectors> <connector class="fs" name="SampleDataConnector"> <properties> <property name="type"><![CDATA[pub]]></property> <property name="dateformat"><![CDATA[%Y-%m-%dT%H:%M:%S]]></property> <property name="rate"><![CDATA[100]]></property> <property name="header"><![CDATA[1]]></property> <property name="addcsvopcode"><![CDATA[true]]></property> <property name="addcsvflags"><![CDATA[normal]]></property> <property name="fsname"><![CDATA[/host/data/input/sample.csv]]></property> <property name="fstype"><![CDATA[csv]]></property> </properties> </connector> </connectors> </window-source> <window-compute pubsub="true" name="ProcessData"> <schema> <fields> <field name="Timestamp" type="stamp" key="true"/> <field name="Temperature" type="double"/> <field name="Humidity" type="double"/> </fields> </schema> <output> <field-expr><![CDATA[Temperature]]></field-expr> <field-expr><![CDATA[Humidity]]></field-expr> </output> <connectors> <connector class="mqtt" name="MqttConnector"> <properties> <property name="type"><![CDATA[sub]]></property> <property name="snapshot"><![CDATA[false]]></property> <property name="mqtthost"><![CDATA[localhost]]></property> <property name="mqtttopic"><![CDATA[Example_Data_Source]]></property> <property name="mqttqos"><![CDATA[0]]></property> <property name="mqttmsgtype"><![CDATA[json]]></property> </properties> </connector> </connectors> </window-compute> </windows> <edges> <edge source="ReadSensorData" target="ProcessData"/> </edges> </contquery> </contqueries> </project>
This project uses a Source window to read the sample data from the CSV file and feeds the data into a Compute window that outputs the data using an MQTT connector. Note that the connector for the Source window specifies the file
/host/data/input/sample.csv
, which is the mounted path to the file that you have just saved in the local directory.In this case, the Compute window simply copies the data from the source window but could be used to perform further processing of the data if required.
-
Upload the XML file into the Edge Streaming Creator application and open the project in the editor:
-
Confirm that everything is configured correctly by entering test mode and select the virtual edge device
VirtualEdgeDevice1
.Click Run test and after a few moments the following output should be observed:
-
After the test is executed, the MindConnect agent log file
mc.log
should contain the following additional log output:Incoming mqtt message: [[{"opcode": "i", "Timestamp": "1635092100000000", "Temperature": 40.0, "Humidity": 80.0}]] Incoming mqtt message: [[{"opcode": "i", "Timestamp": "1635092160000000", "Temperature": 25.0, "Humidity": 79.0}]] Incoming mqtt message: [[{"opcode": "i", "Timestamp": "1635092220000000", "Temperature": 27.0, "Humidity": 75.0}]] Incoming mqtt message: [[{"opcode": "i", "Timestamp": "1635092280000000", "Temperature": 30.0, "Humidity": 72.0}]] Incoming mqtt message: [[{"opcode": "i", "Timestamp": "1635092340000000", "Temperature": 45.0, "Humidity": 70.0}]] Incoming mqtt message: [[{"opcode": "i", "Timestamp": "1635092400000000", "Temperature": 44.0, "Humidity": 72.0}]] Incoming mqtt message: [[{"opcode": "i", "Timestamp": "1635092460000000", "Temperature": 43.0, "Humidity": 74.0}]] Incoming mqtt message: [[{"opcode": "i", "Timestamp": "1635092520000000", "Temperature": 40.0, "Humidity": 79.0}]] Incoming mqtt message: [[{"opcode": "i", "Timestamp": "1635092580000000", "Temperature": 36.0, "Humidity": 80.0}]] [2022-04-29T13:19:16.517Z] Data posted with timestamp: (Sun Oct 24 2021 16:20:00 GMT+0000 (Coordinated Universal Time)) [2022-04-29T13:19:16.541Z] Data posted with timestamp: (Sun Oct 24 2021 16:18:00 GMT+0000 (Coordinated Universal Time)) [2022-04-29T13:19:16.556Z] Data posted with timestamp: (Sun Oct 24 2021 16:21:00 GMT+0000 (Coordinated Universal Time)) [2022-04-29T13:19:16.558Z] Data posted with timestamp: (Sun Oct 24 2021 16:19:00 GMT+0000 (Coordinated Universal Time)) [2022-04-29T13:19:16.597Z] Data posted with timestamp: (Sun Oct 24 2021 16:16:00 GMT+0000 (Coordinated Universal Time)) [2022-04-29T13:19:16.615Z] Data posted with timestamp: (Sun Oct 24 2021 16:17:00 GMT+0000 (Coordinated Universal Time)) [2022-04-29T13:19:16.666Z] Data posted with timestamp: (Sun Oct 24 2021 16:15:00 GMT+0000 (Coordinated Universal Time)) [2022-04-29T13:19:16.708Z] Data posted with timestamp: (Sun Oct 24 2021 16:23:00 GMT+0000 (Coordinated Universal Time)) [2022-04-29T13:19:16.930Z] Data posted with timestamp: (Sun Oct 24 2021 16:22:00 GMT+0000 (Coordinated Universal Time))
-
Open Operation Insights application and select your asset, ExampleAsset1. Select the check boxes next to the Humidity and Temperature data points and finally set the date to the date used in the sample data, 24th October, 2021. The data that was just posted and sent to IoT Time Series should now be displayed:
You now have a data source that is being processed by the Edge Streaming server, output to a MindConnect agent via MQTT, and posted to Insights Hub to be stored as IoT Time Series data.
Except where otherwise noted, content on this site is licensed under the Development License Agreement.