Data Transformation Node Configurations¶
Code Examples for Source, Target, and Transform Nodes¶
Node | Type | Node Configuration | Example of node config with node-id in CodeGenConfigurationNodes (map of node configs) of JSON Payload | Node Details |
---|---|---|---|---|
IDLCsvSource | Source | Reads CSV data from IDL. Users must provide the path of the IDL location and other properties like WithHeader, Recurse, Separator, and the schema of the source data. | ||
IDLJsonSource | Source | N/A | Reads JSON data from IDL. Users must provide the path of the IDL location and other properties like jsonPath, Recurse, Separator, and the output schema for the input source data. | |
IDLParquetTarget | Target | Writes output of the transformed data to IDL target in parquet format. Users must provide the path of the output location. To generate partitioned output users must define the column names as partitionKeys. | ||
IDLDirectTarget | Target | Writes output of transformed data to IDL target in CSV, JSON, Parquet, ORC, etc. format. Users must provide the path of the output location. To generate partitioned output users must define the column names as partitionKeys. | ||
DTT IoT Connector | Target | Allows the target connector to push data to IoT timeseries targets. Users must map the input data source columns to parameters defined in AdditionalOptions. | ||
Apply Mapping or Change the Schema | Transform | Allows users to remap the input dataset property keys into a new configuration for the target data. This transform node allows users to:
| ||
Filter | Transform | Allows users to transform data using operations. | ||
Fill Missing Values | Transform | Allows users to locate records in the data that have missing values and to addd a new field with a value determined by imputation (replacing missing data with substituted values). The input dataset trains the machine learning model so it can determine what the missing value should be. | ||
Join | Transform | Allows users to join two input data sources. | ||
Aggregate | Transform | Allows users to aggregate the input data. | ||
Union | Transform | Allows users to combine rows from more than one input data source, provided they have same schema. | ||
Drop Null Fields | Transform | Allows users to drop a field (column) if all the values are null or negative in the input data source. Users can also specify custom null values in NullTextList, if needed. | ||
Drop Duplicate | Transform | Allows users to remove duplicate rows from the input data source. For more granular control, users can also specify columns--this will remove duplicate rows from the input data source that match the columns the user specifies. | ||
Format Timestamp - Dynamic Transform | Transform | Allows users to transform timestamp column values into strings based on a pattern. | ||
Convert to Timestamp - Dynamic Transform | Transform | Allows users to transform string or numeric column data into timestamps. Users must define how to parse the column selected by choosing the type (colType). If the value type is:
| ||
Drop Fields | Transform | Allows users to create a subset of the source data. Users must provide the data field/column keys to remove from the input data source. |
Sample ETL Flow Payload Templates¶
ETL flow templates are included below as JSON strings: - IDL: CSV-to-Parquet Flow - IDL: Join-Filter Flow
Please feel free to copy/paste the JSON strings as aides in creating your own MDO ETL flows.
IDL: CSV-to-Parquet Flow JSON¶
{
"name": "template-csv-to-parquet",
"description": "CSV to Parquet Template Flow",
"jobJson": {
"CodeGenConfigurationNodes": {
"node_1690400202147": {
"IDLCsvSource": {
"AdditionalOptions": {
"EnableSamplePath": false
},
"Escaper": "",
"Exclusions": [],
"Name": "Amazon S3",
"OptimizePerformance": false,
"OutputSchemas": [
{
"Columns": [
{
"Name": "Event Start Time",
"Type": "string"
},
{
"Name": "Event Type",
"Type": "string"
},
{
"Name": "Operator",
"Type": "string"
},
{
"Name": "Operation",
"Type": "string"
},
{
"Name": "Part Name",
"Type": "string"
},
{
"Name": "Workflow Step",
"Type": "string"
},
{
"Name": "value",
"Type": "string"
},
{
"Name": "Unique Key",
"Type": "string"
},
{
"Name": "Child Data Type",
"Type": "string"
},
{
"Name": "Child Name",
"Type": "string"
},
{
"Name": "Child Task Status",
"Type": "string"
},
{
"Name": "highlimit",
"Type": "string"
},
{
"Name": "Location",
"Type": "string"
},
{
"Name": "lowlimit",
"Type": "string"
},
{
"Name": "Product Description",
"Type": "string"
},
{
"Name": "Quantity",
"Type": "string"
},
{
"Name": "UOM",
"Type": "string"
},
{
"Name": "Workflow Step Resource",
"Type": "string"
},
{
"Name": "Child Data Name",
"Type": "string"
}
]
}
],
"Paths": [
"/analytics/internal/collectdata/data/"
],
"QuoteChar": "quote",
"Recurse": true,
"Separator": "pipe",
"WithHeader": true
}
},
"node_1690401407779": {
"ApplyMapping": {
"Inputs": [
"node_1690400202147"
],
"Mapping": [
{
"Dropped": false,
"FromPath": [
"Event Start Time"
],
"FromType": "string",
"ToKey": "Event Start Time",
"ToType": "timestamp"
},
{
"Dropped": false,
"FromPath": [
"Event Type"
],
"FromType": "string",
"ToKey": "Event Type",
"ToType": "string"
},
{
"Dropped": false,
"FromPath": [
"Operator"
],
"FromType": "string",
"ToKey": "Operator",
"ToType": "string"
},
{
"Dropped": false,
"FromPath": [
"Operation"
],
"FromType": "string",
"ToKey": "Operation",
"ToType": "string"
},
{
"Dropped": false,
"FromPath": [
"Part Name"
],
"FromType": "string",
"ToKey": "Part Name",
"ToType": "string"
},
{
"Dropped": false,
"FromPath": [
"Workflow Step"
],
"FromType": "string",
"ToKey": "Workflow Step",
"ToType": "string"
},
{
"Dropped": false,
"FromPath": [
"value"
],
"FromType": "string",
"ToKey": "value",
"ToType": "string"
},
{
"Dropped": false,
"FromPath": [
"Unique Key"
],
"FromType": "string",
"ToKey": "Unique Key",
"ToType": "string"
},
{
"Dropped": false,
"FromPath": [
"Child Data Type"
],
"FromType": "string",
"ToKey": "Child Data Type",
"ToType": "string"
},
{
"Dropped": false,
"FromPath": [
"Child Name"
],
"FromType": "string",
"ToKey": "Child Name",
"ToType": "string"
},
{
"Dropped": false,
"FromPath": [
"Child Task Status"
],
"FromType": "string",
"ToKey": "Child Task Status",
"ToType": "string"
},
{
"Dropped": false,
"FromPath": [
"highlimit"
],
"FromType": "string",
"ToKey": "highlimit",
"ToType": "string"
},
{
"Dropped": false,
"FromPath": [
"Location"
],
"FromType": "string",
"ToKey": "Location",
"ToType": "string"
},
{
"Dropped": false,
"FromPath": [
"lowlimit"
],
"FromType": "string",
"ToKey": "lowlimit",
"ToType": "string"
},
{
"Dropped": false,
"FromPath": [
"Product Description"
],
"FromType": "string",
"ToKey": "Product Description",
"ToType": "string"
},
{
"Dropped": false,
"FromPath": [
"Quantity"
],
"FromType": "string",
"ToKey": "Quantity",
"ToType": "string"
},
{
"Dropped": false,
"FromPath": [
"UOM"
],
"FromType": "string",
"ToKey": "UOM",
"ToType": "string"
},
{
"Dropped": false,
"FromPath": [
"Workflow Step Resource"
],
"FromType": "string",
"ToKey": "Workflow Step Resource",
"ToType": "string"
},
{
"Dropped": false,
"FromPath": [
"Child Data Name"
],
"FromType": "string",
"ToKey": "Child Data Name",
"ToType": "string"
}
],
"Name": "Change Schema"
}
},
"node_1690401454840": {
"IDLParquetTarget": {
"Compression": "none",
"Inputs": [
"node_1690401407779"
],
"Name": "Amazon S3",
"PartitionKeys": [],
"Path": "/analytics/internal/collectdata/output_data/",
"SchemaChangePolicy": {
"EnableUpdateCatalog": false
}
}
}
}
}
}
IDL: Join-Filter Flow JSON¶
`````` { "name": "template-idl-join-two-source", "jobJson": { "CodeGenConfigurationNodes": { "node_1692120071957": { "IDLCsvSource": { "Escaper": "", "Exclusions": [], "Name": "Amazon S3", "OptimizePerformance": false, "OutputSchemas": [ { "Columns": [ { "Name": "assembly_end_date", "Type": "string" }, { "Name": "assembler_name", "Type": "string" }, { "Name": "manufacturer", "Type": "string" }, { "Name": "assembly_end_time_stamp", "Type": "string" }, { "Name": "pass_qc", "Type": "string" } ] } ], "Paths": [ "/analytics/internal/wps_assembly_data/headlight_end_assembly/" ], "QuoteChar": "quote", "Recurse": true, "Separator": "comma", "WithHeader": true } }, "node_1692120158995": { "ApplyMapping": { "Inputs": [ "node_1692120127042" ], "Mapping": [ { "Dropped": false, "FromPath": [ "assembly_begin_date" ], "FromType": "string", "ToKey": "assembly_begin_date", "ToType": "date" }, { "Dropped": false, "FromPath": [ "assembler_name" ], "FromType": "string", "ToKey": "assembler_name", "ToType": "string" }, { "Dropped": false, "FromPath": [ "manufacturer" ], "FromType": "string", "ToKey": "manufacturer", "ToType": "string" }, { "Dropped": false, "FromPath": [ "assembly_begin_timestamp" ], "FromType": "string", "ToKey": "assembly_begin_timestamp", "ToType": "string" }, { "Dropped": false, "FromPath": [ "tested_bulb_light_output" ], "FromType": "string", "ToKey": "tested_bulb_light_output", "ToType": "string" }, { "Dropped": false, "FromPath": [ "assembly_end_date" ], "FromType": "string", "ToKey": "assembly_end_date", "ToType": "string" }, { "Dropped": true, "FromPath": [ ".assembler_name" ], "FromType": "string", "ToKey": ".assembler_name", "ToType": "string" }, { "Dropped": true, "FromPath": [ ".manufacturer" ], "FromType": "string", "ToKey": ".manufacturer", "ToType": "string" }, { "Dropped": false, "FromPath": [ "assembly_end_time_stamp" ], "FromType": "string", "ToKey": "assembly_end_time_stamp", "ToType": "string" }, { "Dropped": false, "FromPath": [ "pass_qc" ], "FromType": "string", "ToKey": "pass_qc", "ToType": "string" } ], "Name": "Change Schema" } }, "node_1": { "IDLCsvSource": { "Escaper": "", "Exclusions": [], "Name": "S3 bucket", "OptimizePerformance": false, "OutputSchemas": [ { "Columns": [ { "Name": "assembly_begin_date", "Type": "string" }, { "Name": "assembler_name", "Type": "string" }, { "Name": "manufacturer", "Type": "string" }, { "Name": "assembly_begin_timestamp", "Type": "string" }, { "Name": "tested_bulb_light_output", "Type": "string" } ] } ], "Paths": [ "/analytics/internal/wps_assembly_data/headlight_begin_assembly/" ], "QuoteChar": "quote", "Recurse": true, "Separator": "comma", "WithHeader": true } }, "node_3": { "IDLParquetTarget": { "Compression": "snappy", "Inputs": [ "node_1692132267514" ], "Name": "S3 bucket", "PartitionKeys": [ [ "assembler_name" ] ], "Path": "/snapshots/wps_assembly_data/output/", "SchemaChangePolicy": { "EnableUpdateCatalog": false } } }, "node_1692120127042": { "Join": { "Columns": [ { "From": "node_1", "Keys": [ [ "assembler_name" ], [ "manufacturer" ] ] }, { "From": "node_1692120071957", "Keys": [ [ "assembler_name" ], [ "manufacturer" ] ] } ], "Inputs": [ "node_1", "node_1692120071957" ], "JoinType": "equijoin", "Name": "Join" } }, "node_1692132267514": { "Filter": { "Filters": [ { "Negated": false, "Operation": "REGEX", "Values": [ { "Type": "CONSTANT", "Value": [ "TRUE" ] }, { "Type": "COLUMNEXTRACTED", "Value": [ "pass_qc" ] } ] } ], "Inputs": [ "node_1692120158995" ], "LogicalOperator": "AND", "Name": "Filter" } } } } }