Capturing Edge Streaming Analytics results in Industrial IoT - Developer Documentation
Skip to content

Capturing Results from Edge Streaming Analytics in Industrial IoT

This section describes, how to capture results from Edge Streaming Analytics (ESA) as a time series in Industrial IoT storage, with the help of the MindConnect-NodeJS library.

In this section, you will learn how to install and configure the MindConnect-NodeJS library and to create an Edge Streaming Analytics project that can read, process, and send time series data to Insights Hub.

The MindConnect-NodeJS library is a stand-alone library that can be installed on a device and acts as an interface to Insights Hub.

This section contains two sub-sections:

Capturing Results from a Physical Device

In this section, you will learn how to install and configure the MindConnect-NodeJS library on a physical device and create an Edge Streaming Analytics project that can send time series data to Insights Hub.

The MindConnect-NodeJS library is used to create a custom MindConnect agent that enables the Edge Streaming Server to access additional Industrial IoT services. The Event Streaming Server communicates with the MindConnect agent through a local MQTT message broker.

Note

This section assumes that Edge Streaming Analytics is already installed and set up on a physical or virtual device.

Prerequisites

Hardware requirements:

  • 6GB RAM available
  • Dual or quad core CPU, at least 1GHz
  • 8GB free disk space

Software requirements:

  • Red Hat Enterprise Linux 7
  • Root privileges (sudo)
  • Java 8

Installing an MQTT Broker

Communications between the Edge Streaming Server and the MindConnect agent are made using a subscriber connector in the Edge Streaming Server and a compatible subscriber in the agent.

In this example, you will use the MQTT subscriber connector to output events from the model and to implement an MQTT subscriber in the agent. This example uses "Mosquitto", which is a lightweight implementation of an MQTT broker.

Note

Depending on your device's configuration, the commands might need to prefixed with sudo.

  1. Enable the repository, where the Mosquitto is installed.

      yum-config-manager --add-repo=http://download-ib01.fedoraproject.org/pub/epel/7/x86_64/
    
  2. Import the repository's key in Red Hat Enterprise Linux.

      wget https://download-ib01.fedoraproject.org/pub/epel/RPM-GPG-KEY-EPEL-7
      rpm --import RPM-GPG-KEY-EPEL-7
    
  3. Install Mosquitto.

      yum install mosquito
    
  4. Start Mosquitto.

      service mosquitto start
    
  5. In order to test if Mosquitto has started, execute the following command:

      netstat -tulpn
    

The output should show the Mosquitto broker listening on port 1883:

Mosquitto status

Installing the MindConnect-NodeJs Library

  1. The MindConnect-NodeJS library requires NodeJS to run. Use the following command to install NodeJS:

      sudo yum install nodejs
    
  2. The MindConnect-NodeJS library can then be installed using the Node Package Manager (npm):

      sudo npm install -g @mindconnect/mindconnect-nodejs
    
  3. Create a starter project by navigating to the Edge Streaming Analytics directory (/opt/mdsp/esa) and create a new MindConnect-NodeJS project with the following command:

      mc starter-ts --dir /opt/mdsp/esa/mc-agent
    

    This creates an example agent that can be used as a starting point for developing a new custom agent.

  4. Install NodeJS support for MQTT:

      sudo npm install -g mqtt
    
  5. Navigate to the agent directory, /opt/mdsp/esa/mc-agent, and install the MindConnect-NodeJS library dependencies with the following command:

      sudo npm install
    
  6. Replace the auto-generated index.ts file with the code below:

    Code
        import {
            DataSource,
            DataPointValue,
            MindConnectAgent,
            retry
        } from "@mindconnect/mindconnect-nodejs";
    
        (async function () {
            const configFile = "/opt/mdsp/esa/mc-agent/mc-config.json";
    
            // Number of times to retry the operation before giving up and throwing an exception
            const RETRYTIMES = 5;
    
            // Helper methods
            const sleep = (ms: any) => new Promise(resolve => setTimeout(resolve, ms));
            const prependDate = (text: any) => { return `[${new Date().toISOString()}] ${text.toString()}`};
            const log = (text: any) => { console.log(prependDate(text)); };
            const error = (text: any, err?: any) => { 
                console.error(prependDate(text));
                err && console.error(err);
            };
    
            // Connect to MQTT
            const mqtt = require('mqtt');
            const mqttClient = mqtt.connect('mqtt://localhost:1883');
    
            // Method to send data to a specific data source
            // The data object should contain properties that match the datapoint names
            let sendData = (dataSource: any, data: any) => {
                let dataPointValues : DataPointValue[] = [];
    
                dataSource.dataPoints.forEach((dataPoint: any) => {
                    if (data.hasOwnProperty(dataPoint.name)) {
                        dataPointValues.push({
                            dataPointId: dataPoint.id,
                            qualityCode: "0",
                            value: "" + data[dataPoint.name]
                        });
                    }
                });
    
                if (data.Timestamp) {
                    const timestamp = new Date(data.Timestamp/1000);
    
                    agent.PostData(dataPointValues, timestamp)
                        .then(()=>{
                            log("Data posted with timestamp: (" + timestamp + ")");
                        })
                        .catch(err => {
                            error("Error posting data.", err);
                    });
                }
                else {
                    agent.PostData(dataPointValues)
                        .then(() => {
                            log("Data posted successfully.");
                        })
                        .catch((err: any) => {
                            error("Error posting data.", err);
                        });
                }
            };
    
            // Set agent configuration
            const configuration = require(configFile);
            const agent = new MindConnectAgent(configuration);
    
            // Onboard the agent
            if (agent.IsOnBoarded()) {
                log("The agent has previously been onboarded");
            }
            else {
                try {
                    // Attempt to onboard the agent
                    await retry(RETRYTIMES, () => agent.OnBoard());
                    log("The agent has been onboarded");
                }
                catch (err) {
                    error("The agent could not be onboarded.", err);
                }
            }
    
            // Renew the agent token periodically
            setInterval(async () => {
                try {
                    await retry(5, () => agent.RenewToken());
                }
                catch(err) {
                    error("Error renewing agent token.", err);
                }
            }, 
            3600000);
    
            // Get the data sources that have been configured for the agent    
            const dataSourceConfig = await agent.GetDataSourceConfiguration();
            let dataSources: any = {};
    
            mqttClient.on('message', function (topic: string, message: any) {
                try {
                    // message is Buffer
                    console.log("Incoming mqtt message: " + message.toString());
    
                    if (dataSources[topic]) {
                        const outerArray = JSON.parse(message);
    
                        // unwrap outer array
                        outerArray.forEach((innerArray : any) => {
                            // unwrap inner array
                            innerArray.forEach((evt: any) => {
                                sendData(dataSources[topic], evt);
                            });
                        });   
                    }
                    else {
                        error("Message received for unrecognized data source: " + topic);
                    }            
                }
                catch(err) {
                    error("Error handling incoming MQTT message.", err);
                }
            });
    
            dataSourceConfig.dataSources.forEach((dataSource: DataSource) => {
                const topic = dataSource.name;
                dataSources[dataSource.name] = dataSource;
    
                mqttClient.subscribe(topic, function (err: any) {
                    if (err) {
                        error("Error subscribing to topic: " + topic, err);
                    }
                    else {
                        log("Subscribed to MQTT topic: " + topic);
                    }
                });
            });
        })();
    

This code provides a simple custom agent that listens for events from an MQTT source. For each data source associated with the agent, a subscription is added to a topic with the same name as the data source. In this way, data can be routed through the agent to multiple data sources.

For the first time if the agent runs, it on-boards the agent and then stores the credentials required to access Insights Hub, refreshing these credentials regularly.

Note

For a production agent, it is recommended that additional steps are taken to ensure that the credentials are stored securely, which might require further development to the agent depending on the intended application and edge device configuration. This example provides a reference for consumption of data from an Edge Streaming Server, through MQTT which can be extended and modified for more specific use cases. For more information on developing custom agents with the MindConnect-NodeJS library, refer to the MindConnect-NodeJS documentation .

The agent is connected to a Insights Hub asset, so before the code can be executed you need to create a suitable agent asset using the Asset Manager.

Creating a MindConnect Agent

For the data to be visualized in Insights Hub, you need to create a data aspect to define the structure of the data points and create an asset that can be used to bring the data from the edge device into Insights Hub.

Note

Creating an agent requires that the user has permission to access the Asset Manager in the environment account.

  1. On Launchpad, click Asset Manager icon as shown below.

    Asset manager

  2. Select Aspect Types from the Library context menu as shown below, and create a new aspect called ExampleAspect.

    Aspect types

  3. Select the Dynamic category option, which enables the aspect to be used with IoT Time-Series.

  4. Name the aspect ExampleAspect and add two variables, Temperature and Humidity, as shown below:

    Example aspect

  5. Save the aspect, and navigate to Asset Types, as shown below:

    Asset types

  6. Expand the Basic Agent section and select MindConnectLib and then click Create Type to create a new MindConnectLib asset. Verify that the parent type is core.mclib.

  7. Name the asset ExampleMCLAsset.

  8. Click Add aspect to add an additional aspect to the asset. The aspect created in step 4 has the id <tenant>.ExampleAspect where tenant is the name of your environment. Select this aspect and save. For example, with the environment sas, the aspect is sas.ExampleAspect:

    Asset aspects

  9. Return to Assets page and select Create asset to create an asset for your edge device.

  10. On Select Type page, select your newly created asset type ExampleMCLAsset and click Create button.

  11. Name the asset ExampleAsset1 and save. A new asset is created with MindConnectLib connectivity, and the ExampleAspect data aspect:

    Example asset

    Note that the agent shows as offline as you have not yet on-boarded it, and the aspect shows that no data has yet been received.

  12. Click Arrow to open MindConnectLib plugin. Configure the asset to use SHARED_SECRET and save.

  13. Click Generate onboarding key to create a token for on-boarding the edge device.

  14. Copy the configuration and save it in a file on the edge device. In this example, the file should be saved in a file called mc-config.json. If deploying the agent on a physical device, the file should be stored in the directory on the device where you created the MindConnect agent: /opt/mdsp/esa/mc-agent. If deploying the agent on a virtual device, the file should be stored in the local directory for the virtual device. Example: c:\ved1\ or ~/ved1.

  15. Return to MindConnectPlugin page and click Configure your data sources:

    Agent data source configuration

  16. Click Add data source:

    Add data source

  17. Enter the data source name, Example_Data_Source and click Accept:

    Add data source details

  18. Add two data points, Temperature and Humidity, entering the required details for the data points as shown below and click Accept:

    Add temperature data point

    Add humidity data point

    The MindConnect Lib configuration should now appear, as shown below and click Save:

    Configured agent

  19. Navigate to Data mappings tab:

    Data mappings

  20. Click Link variable and link Temperature data source field to the aspect type’s Temperature field:

    Link variable

  21. Repeat for the Humidity data point, so that both data points are linked:

    Link variables

    The asset is now ready to be onboarded to receive data and to map incoming data to the specified aspect type.

Sending Data through the Edge Streaming Creator Project

  1. Start the MindConnect agent by navigating to the /opt/mdsp/esa/mc-agent directory and executing the following command:

      npm start
    

    The agent starts up. When the agent is running, it eventually generates the following output that indicates that the agent has been onboarded successfully, the data source has been detected and an MQTT subscription has been made accordingly:

      [2022-01-26T08:59:46.603Z] The agent has been onboarded
      [2022-01-26T08:59:47.178Z] Subscribed to MQTT topic: Example_Data_Source
    
  2. Save the data below into a suitable location, such as /opt/mdsp/esa/data/input/sample.csv. The data provides sample temperature and humidity readings that are compatible with your data source.

      Timestamp,Temperature,Humidity
      2021-10-24T16:15:00.000Z,40,80
      2021-10-24T16:16:00.000Z,25,79
      2021-10-24T16:17:00.000Z,27,75
      2021-10-24T16:18:00.000Z,30,72
      2021-10-24T16:19:00.000Z,45,70
      2021-10-24T16:20:00.000Z,44,72
      2021-10-24T16:21:00.000Z,43,74
      2021-10-24T16:22:00.000Z,40,79
      2021-10-24T16:23:00.000Z,36,80
      2021-10-24T16:24:00.000Z,30,81
    

    When developing an Edge Streaming Creator project, it is usual practice to begin development with sample data stored in a file before connecting to a live data source, such as sensor data. This provides repeatability as the project is developed and fine-tuned.

    In this example, the source data matches the aspect type in Insights Hub but this is not a requirement as it is the output of the project and not the input that is sent to Insights Hub. It is likely that the project performs some manipulation of the data before it is sent to the cloud, such as performing edge analytics on the streaming data.

  3. Save the following XML code as MindConnectExample.xml:

    Code
      <project name="MindConnectExample" threads="1" pubsub="auto" heartbeat-interval="1">
          <contqueries>
              <contquery name="cq1">
                  <windows>
                      <window-source pubsub="true" index="pi_EMPTY" name="ReadSensorData">
                          <schema>
                              <fields>
                                  <field name="Timestamp" type="stamp" key="true"/>
                                  <field name="Temperature" type="double"/>
                                  <field name="Humidity" type="double"/>
                              </fields>
                          </schema>
                          <connectors>
                              <connector class="fs" name="SampleDataConnector">
                                  <properties>
                                      <property name="type"><![CDATA[pub]]></property>
                                      <property name="dateformat"><![CDATA[%Y-%m-%dT%H:%M:%S]]></property>
                                      <property name="rate"><![CDATA[100]]></property>
                                      <property name="header"><![CDATA[1]]></property>
                                      <property name="addcsvopcode"><![CDATA[true]]></property>
                                      <property name="addcsvflags"><![CDATA[normal]]></property>
                                      <property name="fsname"><![CDATA[/opt/mdsp/esa/data/input/sample.csv]]></property>
                                      <property name="fstype"><![CDATA[csv]]></property>
                                  </properties>
                              </connector>
                          </connectors>
                      </window-source>
                      <window-compute pubsub="true" name="ProcessData">
                          <schema>
                              <fields>
                                  <field name="Timestamp" type="stamp" key="true"/>
                                  <field name="Temperature" type="double"/>
                                  <field name="Humidity" type="double"/>
                              </fields>
                          </schema>
                          <output>
                              <field-expr><![CDATA[Temperature]]></field-expr>
                              <field-expr><![CDATA[Humidity]]></field-expr>
                          </output>
                          <connectors>
                              <connector class="mqtt" name="MqttConnector">
                                  <properties>
                                      <property name="type"><![CDATA[sub]]></property>
                                      <property name="snapshot"><![CDATA[false]]></property>
                                      <property name="mqtthost"><![CDATA[localhost]]></property>
                                      <property name="mqtttopic"><![CDATA[Example_Data_Source]]></property>
                                      <property name="mqttqos"><![CDATA[0]]></property>
                                      <property name="mqttmsgtype"><![CDATA[json]]></property>
                                  </properties>
                              </connector>
                          </connectors>
                      </window-compute>
                  </windows>
                  <edges>
                      <edge source="ReadSensorData" target="ProcessData"/>
                  </edges>
              </contquery>
          </contqueries>
      </project>
    

    This project uses a Source window to read the sample data from the CSV file and feeds the data into a Compute window that provides the output data using an MQTT connector. In this case, the Compute window simply copies the data from the Source window, but could be used to perform further processing of the data if required.

  4. Upload the XML file into the Edge Streaming Creator application, and open the project in the editor:

    Project view

  5. Confirm that everything is configured correctly by entering test mode and running the project. The following output should be observed:

    Test results

  6. The output can be tested by running the MindConnect agent, which subscribes to the data source topic on the MQTT broker, and running the project in test mode. For example, the following output from the agent would be observed:

    Agent output

    The messages can also be viewed by using the MQTT Command Line Interface to subscribe to the relevant topics, which can be useful when debugging.

  7. Open Operation Insights application and select your asset ExampleAsset1. Select the check boxes next to the Humidity and Temperature data points and finally set the date to the date used in the sample data, 24th October, 2021. The data that was just posted and sent to IoT Time Series should now be displayed:

    Operation insights

You now have a data source that is being processed by the Edge Streaming Server, output to a MindConnect agent via MQTT, and posted to Insights Hub to be stored as IoT Time Series data.

Capturing Results from a Virtual Device

When developing an edge application it might not be desirable nor practical to develop the analytical model directly on an edge device. Through the use of containerization one or more virtual edge devices can be created and deployed for use during development. Using a local image allows the local file system to be mounted, which simplifies the process of testing with sample data files. A virtual device deployed on a server can be maintained and shared for demonstration purposes.

In this section, you will learn how to deploy a virtual edge device using Docker, install and configure the MindConnect-NodeJS library and create an Edge Streaming Analytics project that can send Time Series data.

The MindConnect-NodeJS library is used to create a custom MindConnect agent that enables the Edge Streaming Server to access additional Industrial IoT services. The Event Streaming Server communicates with the MindConnect agent through a local MQTT message broker.

Prerequisites

Software Requirements:

  • Docker

Creating a Virtual Edge Device Image

  1. Create a working directory for creating the Docker image. Example: C:\virtual-edge-device or ~/virtual-edge-device.

  2. Copy the following file into the working directory, naming the file Dockerfile:

        FROM quay.io/centos/centos:stream8
    
        ENV container docker
        ARG EDGE_VERSION=1.6.4
    
        RUN yum update -y
    
        # Mosquitto and Node.js require extra packages that are available from the epel-release repository
        RUN yum install -y epel-release
    
        # Useful utilities
        RUN yum install -y unzip
        RUN yum install -y nano
    
        # Dependencies required by the Edge Streaming Server
        RUN yum install -y numactl
        RUN yum install -y compat-openssl10
        RUN yum install -y libnsl
    
        # Dependencies needed to run the Edge Streaming Analytics agent
        RUN yum install -y java-11
        ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.12.0.7-0.el7_9.x86_64/
    
        # Mosquitto is used to enable communications between the Edge Streaming server and the MindConnect agent
        RUN yum install -y mosquitto
    
        # This section creates directories needed for the Edge Streaming agents
        RUN mkdir /opt/mdsp
        RUN mkdir /opt/mdsp/esa
    
        # This section copies and unzips the Edge Streaming Server and agent
        COPY ./SiemensEdgeStreamingServer-v${EDGE_VERSION}.zip /tmp
        RUN unzip /tmp/SiemensEdgeStreamingServer-v${EDGE_VERSION}.zip -d /opt/mdsp/esa/
    
        # This step fixes the encryption key used by the Edge Streaming agent to enable the docker image to be stopped and started. The secret should be changed to a unique value.
        ENV esp_edge_secret=xyz987
        ENV esp_edge_secret_custom=true
    
        # This section sets environment variables required by the Edge Streaming server
        ENV ESATK=/opt/mdsp/esa/home/ESA/esaexe
        ENV TKPATH=${ESATK}:${ESATK}/../utilities/bin
        ENV DFESP_HOME=/opt/mdsp/esa/home/EdgeStreamingServer/v${EDGE_VERSION}
        ENV DFESP_CONFIG=/opt/mdsp/esa/config/etc/EdgeStreamingServer/default
    
        # This section installs the MindConnect-NodeJS library and creates a starter project
        RUN yum install -y nodejs
        RUN npm install -g @mindconnect/mindconnect-nodejs
        RUN npx @mindconnect/mindconnect-nodejs starter-ts --dir /opt/mdsp/esa/mc-agent &&\
            cd /opt/mdsp/esa/mc-agent &&\
            npm install --save mqtt &&\
            npm install
    
        # This section copies the local MindConnectNodeJS index.ts file into the image
        COPY ./index.ts /opt/mdsp/esa/mc-agent/index.ts
    
        # This section copies the local run script and removes any \r escape characters that might be added if the file is edited in a Windows Operating System
        COPY ./run.sh /opt/mdsp/esa/run.sh
        RUN sed -i "s|\r||g" /opt/mdsp/esa/run.sh
    
        # This section sets the run script as the entrypoint for the container
        ENTRYPOINT ["/opt/mdsp/esa/run.sh"]
    

    The Docker file starts with a CentOS base image.

    Note

    CentOS is a clone of Red Hat Enterprise Linux (RHEL) that can be accessed for free. You might wish to replace the base image, for example to use a hardened RHEL image that is approved for use within your company or organization.

    The Docker file also installs additional packages that are required to run the Edge Streaming Server, Edge Streaming Analytics agent and the MindConnect-NodeJS library.

    Furthermore, the Docker file sets required environment variables and copies required assets from the local file system. These are described in more detail in the following steps.

    When running the Edge Streaming Analytics agent, the credentials are stored in an encrypted file on the device. When using a Docker image, the encryption key changes each time the Docker image is started, so the following environment variables override the default behavior to use the same encryption key each time a container is created:

      ENV esp_edge_secret=xyz987
      ENV esp_edge_secret_custom=true
    

    Whilst the value of the secret can be changed, it is recommended that this setting is used only in development environments.

  3. Copy the latest Edge Streaming Analytics distribution from Siemens Industry Online Support, for example SiemensEdgeStreamingServer-v1.6.4.zip and save in the same directory as the Docker file. Edit the Docker file argument EDGE_VERSION to match the version number, for example:

      ARG EDGE_VERSION=1.6.4
    
  4. Save the following code with the filename index.ts, into the same directory as the Docker file:

    Code
        import {
            DataSource,
            DataPointValue,
            MindConnectAgent,
            retry
        } from "@mindconnect/mindconnect-nodejs";
    
        (async function () {
            const configFile = "/host/mc-config.json";
    
            // Number of times to retry the operation before giving up and throwing an exception
            const RETRYTIMES = 5;
    
            // Helper methods
            const sleep = (ms: any) => new Promise(resolve => setTimeout(resolve, ms));
            const prependDate = (text: any) => { return `[${new Date().toISOString()}] ${text.toString()}`};
            const log = (text: any) => { console.log(prependDate(text)); };
            const error = (text: any, err?: any) => { 
                console.error(prependDate(text)); 
                err && console.error(err);
            };
    
            // Connect to MQTT
            const mqtt = require('mqtt');
            const mqttClient = mqtt.connect('mqtt://localhost:1883');
    
            // Method to send data to a specific data source
            // The data object should contain properties that match the datapoint names
            let sendData = (dataSource: any, data: any) => {
                let dataPointValues : DataPointValue[] = [];
    
                dataSource.dataPoints.forEach((dataPoint: any) => {
                    if (data.hasOwnProperty(dataPoint.name)) {
                        dataPointValues.push({
                            dataPointId: dataPoint.id,
                            qualityCode: "0",
                            value: "" + data[dataPoint.name]
                        });
                    }
                });
    
                if (data.Timestamp) {
                    const timestamp = new Date(data.Timestamp/1000);
    
                    agent.PostData(dataPointValues, timestamp)
                        .then(()=>{
                            log("Data posted with timestamp: (" + timestamp + ")");
                        })
                        .catch(err => {
                            error("Error posting data.", err);
                    });
                }
                else {
                    agent.PostData(dataPointValues)
                        .then(() => {
                            log("Data posted successfully.");
                        })
                        .catch((err: any) => {
                            error("Error posting data.", err);
                        });
                }
            };
    
            // Set agent configuration
            const configuration = require(configFile);
            const agent = new MindConnectAgent(configuration, 600, "/host");
    
            // Onboard the agent
            if (agent.IsOnBoarded()) {
                log("The agent has previously been onboarded");
            }
            else {
                try {
                    // Attempt to onboard the agent
                    await retry(RETRYTIMES, () => agent.OnBoard());
                    log("The agent has been onboarded");
                }
                catch (err) {
                    error("The agent could not be onboarded.", err);
                }
            }
    
            // Renew the agent token periodically
            setInterval(async () => {
                try {
                    await retry(5, () => agent.RenewToken());
                }
                catch(err) {
                    error("Error renewing agent token.", err);
                }
            }, 
            3600000);
    
            // Get the data sources that have been configured for the agent    
            const dataSourceConfig = await agent.GetDataSourceConfiguration();
            let dataSources: any = {};
    
            mqttClient.on('message', function (topic: string, message: any) {
                try {
                    // message is Buffer
                    console.log("Incoming mqtt message: " + message.toString());
    
                    if (dataSources[topic]) {
                        const outerArray = JSON.parse(message);
    
                        // unwrap outer array
                        outerArray.forEach((innerArray : any) => {
    
                            // unwrap inner array
                            innerArray.forEach((evt: any) => {
                                sendData(dataSources[topic], evt);
                            });
                        });        
                    }
                    else {
                            error("Message received for unrecognized data source: " + topic);
                    }            
                }
                catch(err) {
                    error("Error handling incoming MQTT message.", err);
                }
            });
    
            dataSourceConfig.dataSources.forEach((dataSource: DataSource) => {
                const topic = dataSource.name;
                dataSources[dataSource.name] = dataSource;
    
                mqttClient.subscribe(topic, function (err: any) {
                    if (err) {
                        error("Error subscribing to topic: " + topic, err);
                    }
                    else {
                        log("Subscribed to MQTT topic: " + topic);
                    }
                });
            });
        })();
    

    This is the MindConnect agent that was used previously with two modifications. The configuration file is set to look in the directory /host and the agent is instructed to store on-boarded credentials in the same directory.

        const configFile = "/host/mc-config.json";
        ...
        const agent = new MindConnectAgent(configuration, 600, "/host");
    

    The use of a mounted local directory allows the container to be stopped and restarted without having to re-onboard the agents.
    This provides a convenient mechanism for sharing files with the running container via the local file system.

    Note

    The MindConnect agent stores the credentials in an unencrypted file, so these changes should be used only in a development environment.

  5. Save the following code in a file called run.sh in the same directory as the Docker file:

        #!/bin/bash
    
        # This section waits for the local directory to be mounted
        WAITING=1
    
        while [ $WAITING -eq 1 ]
        do
            echo "Waiting for /host directory to be mounted..."
            if [ -d "/host" ]; then
                echo "Directory /host mounted."
                ((WAITING = 0))
            fi
            sleep 1
        done
    
        # This section starts Mosquitto
        echo "Starting Mosquitto"
        startMosquitto="mosquitto -d";
        eval "${startMosquitto}" & disown;
    
        # THis section starts the Edge Streaming agent
        echo "Starting Edge Streaming agent"
        startEdge="bash /opt/mdsp/esa/startup.sh --esa-onboarding-config-file /host/edge-config.json > /host/esa.log 2>/host/esa.errors"
        eval "${startEdge}" & disown
    
        # This section starts the MindConnect agent and logs output to files named mc.log and mc.errors
        echo "Starting MindConnect Agent"
        cd /opt/mdsp/esa/mc-agent
        npm start > /host/mc.log 2>/host/mc.errors
    

    This script waits for a local directory to be mounted internally as \host. When the mounted directory is appeared, the script starts the Mosquitto broker, the Edge Streaming Analytics agent, and the MindConnect agent. The Edge Streaming Analytics agent looks for an on-boarding configuration file edge-config.json in the mounted directory and saves the log output into the mounted directory.

  6. Build the Docker file by running the following command from within the working directory where you have saved the Docker file and assets:

      docker build . -t virtual-edge-device
    

    In this example, a build image is tagged with a friendly name virtual-edge-device which can be used in place of the generated image identifier.

    Note

    You might need to use a tag that is based on a specific repository, such as docker.yourcompany.com/virtual-edge-device, depending on how your Docker installation is configured.

Deploying a Virtual Edge Device Container

  1. Create a new directory to store assets for a specific virtual edge device instance. Example: C:\ved1 or ~/ved1.

  2. Open Asset Manager in a web browser and create a MindConnectLib asset, such as VirtualEdgeDevice1.

  3. Generate a SHARED_SECRET boarding configuration.

  4. Save the on-boarding key as edge-config.json in the directory you previously created, such as C:\ved1\edge-config.json.

  5. Follow the steps in Creating a MindConnect Agent to create an asset that can be used by the MindConnect-NodeJS library to communicate with Insights Hub and which has the correct aspect bindings to correctly map incoming time series data.

  6. Run the following command to deploy a container using the image that you created previously:

      docker run -d --rm --name ved1 --mount type=bind,source=/c/ved1,target=/host virtual-edge-device
    

    This command names the container ved1, in detached mode, and mounts the local directory (for example /c/ved1) to an internal directory called /host, using the image virtual-edge-device. As the run.sh bash script executes it outputs the following, which can be observed by viewing the container logs:

      $ docker logs ved1
      Waiting for /host directory to be mounted...
      Directory /host mounted.
      Starting Mosquitto
      Starting Edge Streaming Agent
      Starting MindConnect Agent
    

    If the container is being run for the first time, then the file mc.log contains the following log output:

      > mc-agent-ts@1.0.0 start /opt/mdsp/esa/mc-agent
      > tsc && node index.js
    
      [2022-04-29T12:53:56.552Z] The agent has been onboarded
      [2022-04-29T12:53:57.096Z] Subscribed to MQTT topic: Example_Data_Source
    

    If the container is being re-run, then the file contains the following log output:

      > mc-agent-ts@1.0.0 start /opt/mdsp/esa/mc-agent
      > tsc && node index.js
    
      [2022-04-29T12:57:10.068Z] The agent has previously been onboarded
      [2022-04-29T12:57:10.830Z] Subscribed to MQTT topic: Example_Data_Source
    

    The local directory now contains six additional files:

    • <asset_id>.json, where asset_id is the unique identifier for the MindConnect agent, which stores the onboarded credentials for the agent.
    • registration.txt, which is an encrypted file containing the on-boarded credentials for the Edge Streaming Analytics agent.
    • esa.logand esa.errors, which contain the log and error output for the Edge Streaming Analytics agent.
    • mc.log and mc.errors, which contain the log and error output for the MindConnect agent. When the Edge Streaming Analytics agent has connected successfully to the cloud, an entry similar to the following are observed in the esa.log file:
    2022-02-22 09:56:59.334  INFO 20 --- [pool-1-thread-1] c.s.e.s.e.w.c.EdgeToCloudWebsocketClient : Connected to Cloud proxy: <asset_id>
    

Running a Streaming Data Project Using Edge Streaming Creator

  1. Save the data below into a suitable directory within the edge device directory, such as C:\ved1\data\input\sample.csv. The data provides sample temperature and humidity readings that are compatible with your data source.

      Timestamp,Temperature,Humidity
      2021-10-24T16:15:00.000Z,40,80
      2021-10-24T16:16:00.000Z,25,79
      2021-10-24T16:17:00.000Z,27,75
      2021-10-24T16:18:00.000Z,30,72
      2021-10-24T16:19:00.000Z,45,70
      2021-10-24T16:20:00.000Z,44,72
      2021-10-24T16:21:00.000Z,43,74
      2021-10-24T16:22:00.000Z,40,79
      2021-10-24T16:23:00.000Z,36,80
      2021-10-24T16:24:00.000Z,30,81
    

    When developing an Edge Streaming Creator project, it is usual practice to begin development with sample data stored in a file, before connecting to a live data source, such as sensor data. This provides repeatability as the project is developed and fine-tuned.

    In this example, the source data matches the aspect type in Insights Hub but this is not a requirement as it is the output of the project and not the input that is sent to Insights Hub. It is likely that the project performs some manipulation of the data before it is sent to the cloud, such as performing edge analytics on the streaming data.

  2. Save the following code in a file called MindConnectExample.xml:

    Code
          <project name="MindConnectExample" threads="1" pubsub="auto" heartbeat-interval="1">
              <contqueries>
                  <contquery name="cq1">
                      <windows>
                          <window-source pubsub="true" index="pi_EMPTY" name="ReadSensorData">
                              <schema>
                                  <fields>
                                      <field name="Timestamp" type="stamp" key="true"/>
                                      <field name="Temperature" type="double"/>
                                      <field name="Humidity" type="double"/>
                                  </fields>
                              </schema>
                              <connectors>
                                  <connector class="fs" name="SampleDataConnector">
                                      <properties>
                                          <property name="type"><![CDATA[pub]]></property>
                                          <property name="dateformat"><![CDATA[%Y-%m-%dT%H:%M:%S]]></property>
                                          <property name="rate"><![CDATA[100]]></property>
                                          <property name="header"><![CDATA[1]]></property>
                                          <property name="addcsvopcode"><![CDATA[true]]></property>
                                          <property name="addcsvflags"><![CDATA[normal]]></property>
                                          <property name="fsname"><![CDATA[/host/data/input/sample.csv]]></property>
                                          <property name="fstype"><![CDATA[csv]]></property>
                                      </properties>
                                  </connector>
                              </connectors>
                          </window-source>
                          <window-compute pubsub="true" name="ProcessData">
                              <schema>
                                  <fields>
                                      <field name="Timestamp" type="stamp" key="true"/>
                                      <field name="Temperature" type="double"/>
                                      <field name="Humidity" type="double"/>
                                  </fields>
                              </schema>
                              <output>
                                  <field-expr><![CDATA[Temperature]]></field-expr>
                                  <field-expr><![CDATA[Humidity]]></field-expr>
                              </output>
                              <connectors>
                                  <connector class="mqtt" name="MqttConnector">
                                      <properties>
                                          <property name="type"><![CDATA[sub]]></property>
                                          <property name="snapshot"><![CDATA[false]]></property>
                                          <property name="mqtthost"><![CDATA[localhost]]></property>
                                          <property name="mqtttopic"><![CDATA[Example_Data_Source]]></property>
                                          <property name="mqttqos"><![CDATA[0]]></property>
                                          <property name="mqttmsgtype"><![CDATA[json]]></property>
                                      </properties>
                                  </connector>
                              </connectors>
                          </window-compute>
                      </windows>
                      <edges>
                          <edge source="ReadSensorData" target="ProcessData"/>
                      </edges>
                  </contquery>
              </contqueries>
          </project>
    

    This project uses a Source window to read the sample data from the CSV file and feeds the data into a Compute window that outputs the data using an MQTT connector. Note that the connector for the Source window specifies the file /host/data/input/sample.csv, which is the mounted path to the file that you have just saved in the local directory.

    In this case, the Compute window simply copies the data from the source window but could be used to perform further processing of the data if required.

  3. Upload the XML file into the Edge Streaming Creator application and open the project in the editor:

    Project view

  4. Confirm that everything is configured correctly by entering test mode and select the virtual edge device VirtualEdgeDevice1.

    Virtual edge device selection

    Click Run test and after a few moments the following output should be observed:

    Test results

  5. After the test is executed, the MindConnect agent log file mc.log should contain the following additional log output:

        Incoming mqtt message: [[{"opcode": "i", "Timestamp": "1635092100000000", "Temperature": 40.0, "Humidity": 80.0}]]
        Incoming mqtt message: [[{"opcode": "i", "Timestamp": "1635092160000000", "Temperature": 25.0, "Humidity": 79.0}]]
        Incoming mqtt message: [[{"opcode": "i", "Timestamp": "1635092220000000", "Temperature": 27.0, "Humidity": 75.0}]]
        Incoming mqtt message: [[{"opcode": "i", "Timestamp": "1635092280000000", "Temperature": 30.0, "Humidity": 72.0}]]
        Incoming mqtt message: [[{"opcode": "i", "Timestamp": "1635092340000000", "Temperature": 45.0, "Humidity": 70.0}]]
        Incoming mqtt message: [[{"opcode": "i", "Timestamp": "1635092400000000", "Temperature": 44.0, "Humidity": 72.0}]]
        Incoming mqtt message: [[{"opcode": "i", "Timestamp": "1635092460000000", "Temperature": 43.0, "Humidity": 74.0}]]
        Incoming mqtt message: [[{"opcode": "i", "Timestamp": "1635092520000000", "Temperature": 40.0, "Humidity": 79.0}]]
        Incoming mqtt message: [[{"opcode": "i", "Timestamp": "1635092580000000", "Temperature": 36.0, "Humidity": 80.0}]]
        [2022-04-29T13:19:16.517Z] Data posted with timestamp: (Sun Oct 24 2021 16:20:00 GMT+0000 (Coordinated Universal Time))
        [2022-04-29T13:19:16.541Z] Data posted with timestamp: (Sun Oct 24 2021 16:18:00 GMT+0000 (Coordinated Universal Time))
        [2022-04-29T13:19:16.556Z] Data posted with timestamp: (Sun Oct 24 2021 16:21:00 GMT+0000 (Coordinated Universal Time))
        [2022-04-29T13:19:16.558Z] Data posted with timestamp: (Sun Oct 24 2021 16:19:00 GMT+0000 (Coordinated Universal Time))
        [2022-04-29T13:19:16.597Z] Data posted with timestamp: (Sun Oct 24 2021 16:16:00 GMT+0000 (Coordinated Universal Time))
        [2022-04-29T13:19:16.615Z] Data posted with timestamp: (Sun Oct 24 2021 16:17:00 GMT+0000 (Coordinated Universal Time))
        [2022-04-29T13:19:16.666Z] Data posted with timestamp: (Sun Oct 24 2021 16:15:00 GMT+0000 (Coordinated Universal Time))
        [2022-04-29T13:19:16.708Z] Data posted with timestamp: (Sun Oct 24 2021 16:23:00 GMT+0000 (Coordinated Universal Time))
        [2022-04-29T13:19:16.930Z] Data posted with timestamp: (Sun Oct 24 2021 16:22:00 GMT+0000 (Coordinated Universal Time))
    
  6. Open Operation Insights application and select your asset, ExampleAsset1. Select the check boxes next to the Humidity and Temperature data points and finally set the date to the date used in the sample data, 24th October, 2021. The data that was just posted and sent to IoT Time Series should now be displayed:

    Operation insights

    You now have a data source that is being processed by the Edge Streaming server, output to a MindConnect agent via MQTT, and posted to Insights Hub to be stored as IoT Time Series data.


Except where otherwise noted, content on this site is licensed under the Development License Agreement.


Last update: July 6, 2023