Page tree
Skip to end of metadata
Go to start of metadata

Baseline

In a city, we have two arrays of sensors, one controlled by the waste company which consists of fill level sensors in each waste container. The second set of sensors are several weather stations which measure several things, between other measurements, the stations measure temperature. The sensors send the sensor data using IoT connectivity such as Syfox, LowRa, M2M ETSI or 5G and the communication protocol MQTT into the corresponding servers. For the sake of the example, we have three stakeholders, the waste company, the city government and the citizen using a citizen app provided by a third party.

Requirements

Loading the Scenario

To generate the scenario, we have created a docker-compose which starts three brokers, one agent, one Service Catalog, and one rest endpoint. We will explain this in detail later. To load the scenario by cloning the tutorial (currently only in dev branch):

git clone https://code.linksmart.eu/scm/la/tutorial.git 
cd tutorial

And now start the scenario by:

docker-compose -f docker-compose.yml -f docker-compose.tutorial.yml up --abort-on-container-exit

If everything works, you should be able to subscribe to fill level sensors and temperature sensors by:

#  subscribe to the temperature measured by the weather stations 
mosquitto_sub -t 'LS/weatherStation/ws1/OGC/1.0/Datastreams/#' -p 1880
#  subscribe to the fill level measured by the waste bins
mosquitto_sub -t 'LS/bin/bin1/OGC/1.0/Datastreams/#' -p 1881

Troubleshoot

The following ports must be free (not used by any other service) in the host network (the user computer): 1880 (weather-broker), 1881 (waste-broker), 1882 (city-broker), 1883 (ls-broker), 8082 (service catalog), 8319 (agent)

Stream Aggregation

Now the city wants to know the average temperatures in the city using their infrastructure. To do so, they deploy following aggregation statement which gives us the average temperature for all five weather stations:

select avg(intResult) from Observation(datastream.id.toString() like 'tmp%').win:time(30 sec)

The full statement below:

{
  "name": "average_temperature",
  "statement": "select avg(intResult) from Observation(datastream.id.toString() like 'tmp%').win:time(30 sec) "
}

deploy by (or any REST client):

curl -X PUT \
  http://localhost:8319/statement/average_temperature \
  -H 'Content-Type: application/json' \
  -d '{"name": "average_temperature" ,"statement": "select avg(intResult) from Observation(datastream.id.toString() like '\''tmp%'\'').win:time(30 sec) "}' -v | jq .

now you can subscribe to the average temperatures by:  

mosquitto_sub -t LS/+/+/OGC/1.0/Datastreams/average_temperature 

Information

The queries are written in EPL this is the query language of ESPER (See Esper 7.1.0). The agent can load any CEP engine, but esper is the prefered one.

Query Streams

Now the waste management company wants to get an alert every time a bin is over 75%, to do so wrote following statement:

select datastream.thing.id from Observation(datastream.id.toString() like 'fill%' and intResult > 75)

This query will create a new measurement for each measurement over 75% with the corresponding bin ID.

The full statement below:

{
  "name": "full_alert",
  "statement": "select datastream.thing.id from Observation(datastream.id.toString() like 'fill%' and intResult> 75)"
}

Deploy by:

curl -X PUT \
  http://localhost:8319/statement/full_alert \
  -H 'Content-Type: application/json' \
  -d '{"name": "full_alert" ,"statement": "select datastream.thing.id from Observation(datastream.id.toString() like '\''fill%'\'' and intResult> 75)"}' -v | jq .

now you can subscribe to the alerts by: 

mosquitto_sub  -t LS/+/+/OGC/1.0/Datastreams/full_alert

Process Streams

Now the waste management company wants approximately how much weight the bins have. They multiply the average weight to the fullness percentage 

select datastream.thing.id as binID, intResult*10000 as weight  from Observation(datastream.id.toString() like 'fill%')

The full statement below:

{
  "name": "weight",
  "statement": "select datastream.thing.id as binID, intResult*10000 as weight  from Observation(datastream.id.toString() like 'fill%')"
}

deploy by:

curl -X PUT \
  http://localhost:8319/statement/weight \
  -H 'Content-Type: application/json' \
  -d '{"name": "weight" ,"statement": "select datastream.thing.id as binID, intResult*10000 as weight  from Observation(datastream.id.toString() like '\''fill%'\'')"}' -v | jq .

now you can subscribe to the alerts by:

mosquitto_sub -t LS/+/+/OGC/1.0/Datastreams/weight 

Fusion Streams

The citizens have been complaining that some bins stink when it is too hot. Therefore, they ask the waste management company to pick them up earlier when the temperature is too hot. To do so, the city combines the weather and bin data. They check if the thermometer nearby a specific bin (bin1 in this example) is over 25° and if the bin is also over 50% full, they create an alert. 

select 'bin1' as binID  from Observation((datastream.id.toString() = 'tmp1' and  intResult> 25 ) or ( datastream.id.toString() = 'fill1' and intResult> 50 )).win:time(1 sec)  having count(*)=2

The full statement below:

{
    "name": "stinky_bin" ,
    "statement": "select 'bin1' as binID  from Observation((datastream.id.toString() = 'tmp1' and  intResult> 25 ) or ( datastream.id.toString() = 'fill1' and intResult> 50 )).win:time(1 sec)  having count(*)=2"
}

Deploy by:

curl -X PUT \
  http://localhost:8319/statement/stinky_bin \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "stinky_bin1" ,
    "statement": "select '\''bin1'\'' as binID  from Observation((datastream.id.toString() = '\''tmp1'\'' and  intResult> 25 ) or ( datastream.id.toString() = '\''fill1'\'' and intResult> 50 )).win:time(1 sec)  having count(*)=2"
}'

now you can subscribe to the alerts by:

mosquitto_sub  -t LS/+/+/OGC/1.0/Datastreams/stinky_bin

Important note of the Agent IO

The agent in this tutorial is configured to collect, events from the waste and whether brokers. Additionally, an agent has one default broker output; for this tutorial was configured to the weather broker, which is exposed in the default MQTT unsecured port.


Route Broker

Now as already the note above explained, the events are arriving at the weather network and not to the city network. Now we query a second stinky bin doing as before.

select 'bin2' as binID  from Observation((datastream.id.toString() = 'tmp2' and  intResult> 25 ) or ( datastream.id.toString() = 'fill2' and intResult> 50 )).win:time(1 sec)  having count(*)=2

But now we change the default output of the event to the city network by assign as target scope the city broker.

The full statement below:

{
  "name": "stinky_bin_route",
  "statement": "select 'bin2' as binID  from Observation((datastream.id.toString() = 'tmp2' and  intResult> 25 ) or ( datastream.id.toString() = 'fill2' and intResult> 50 )).win:time(1 sec)  having count(*)=2",
  "scope": [
    "city"
  ]
}

In this tutorial, we used Service catalog as the information point for the Services. This means we resolve the endpoints using aliases as IDs to located the service endpoints which are stored in the Service Catalog. Therefore, when a scope is defined, and there is a Service Catalog deploy, the Agent will search for the endpoint using the alias in the Service Catalog. Alternatively, the resolution of the aliases can be done by the configuration of the Agent using its configuration file.

deploy by:

curl -X PUT \
  http://localhost:8319/statement/stinky_bin_route \
  -H 'Content-Type: application/json' \
  -d '{"name": "stinky_bin_route" ,"statement": "select '\''bin2'\'' as binID  from Observation((datastream.id.toString() = '\''tmp2'\'' and  intResult> 25 ) or ( datastream.id.toString() = '\''fill2'\'' and intResult> 50 )).win:time(1 sec)  having count(*)=2","scope":["city"]}' -v | jq .

now you can subscribe to the alerts by:

mosquitto_sub -p 1882 -t LS/+/+/OGC/1.0/Datastreams/stinky_bin_route 

Important note of the Agent IO

The output topic is autogenerated using LinkSmart® specification. For OGC Observations (what we are using in this examples) the topic is LS/<serviceCode>/<serviceID>/OGC/1.0/<statementID> . The base topic for generating the events can be changed using the configuration file or environmental variables.

Route Output Topic

Now let's imagine that the city has different topic structure, therefore want to change the default output topic. Using similar query like the example before. 

select 'bin3' as binID  from Observation((datastream.id.toString() = 'tmp3' and  intResult> 25 ) or ( datastream.id.toString() = 'fill3' and intResult> 50 )).win:time(1 sec)  having count(*)=2

Now we change the default output topic of the event sent to the city broker, to do so, we set the output to the desired topic. 

The full statement below:

{
  "name": "stinky_bin_route2",
  "statement": "select 'bin3' as binID  from Observation((datastream.id.toString() = 'tmp3' and  intResult> 25 ) or ( datastream.id.toString() = 'fill3' and intResult> 50 )).win:time(1 sec)  having count(*)=2",
  "scope": [
    "city"
  ],
  "output": [
    "LS/my/topic"
  ]
}

deploy by:

curl -X PUT \
  http://localhost:8319/statement/stinky_bin_route2 \
  -H 'Content-Type: application/json' \
  -d '{"name": "stinky_bin_route2" ,"statement": "select '\''bin3'\'' as binID  from Observation((datastream.id.toString() = '\''tmp3'\'' and  intResult> 25 ) or ( datastream.id.toString() = '\''fill3'\'' and intResult> 50 )).win:time(1 sec)  having count(*)=2","scope":["city"],	"output":["LS/my/topic"]}' -v | jq .

now you can subscribe to the alerts by:

mosquitto_sub -p 1882 -t LS/my/topic

Important note of the Agent IO

The default output payload in the Agent is OGC Observation. The agent supports OGC Observation, SenML, and RAW types. The RAW time can accept and generate any JSON event which based in an object. The default output can be changed using the configuration file or environmental variables

Translate Payload Standard

Now let's imagine that the city has different topic structure and also the follow another payload standard, therefore want to change the default output topic and payload. Using a similar query like the example before. 

select 'bin4' as binID  from Observation((datastream.id.toString() = 'tmp4' and  intResult> 25 ) or ( datastream.id.toString() = 'fill4' and intResult> 50 )).win:time(1 sec)  having count(*)=2

Now we change the default output topic of the event sent to the city broker, to do so, we set the output to the desired topic. 

The full statement below:

{
  "name": "stinky_bin_route_translate",
  "statement": "select 'bin4' as binID  from Observation((datastream.id.toString() = 'tmp4' and  intResult> 25 ) or ( datastream.id.toString() = 'fill4' and intResult> 50 )).win:time(1 sec)  having count(*)=2",
  "scope": [
    "city"
  ],
  "output": [
    "LS/DPA/1/SenML/10/Event/stinky_bin_route_translate"
  ],
  "resultType": "SenML"
}

deploy by:

curl -X PUT \
  http://localhost:8319/statement/stinky_bin_route_translate \
  -H 'Content-Type: application/json' \
  -d '{"name": "stinky_bin_route_translate" ,"statement": "select '\''bin4'\'' as binID  from Observation((datastream.id.toString() = '\''tmp4'\'' and  intResult> 25 ) or ( datastream.id.toString() = '\''fill4'\'' and intResult> 50 )).win:time(1 sec)  having count(*)=2","scope":["city"],"output":["LS/DPA/1/SenML/10/Event/stinky_bin_route_translate"],"resultType":"SenML"}' -v | jq .

now you can subscribe to the alerts by:

mosquitto_sub -p 1882 -t LS/DPA/1/SenML/10/Event/stinky_bin_route_translate

Free Payload Transformation

Now imagine that the city does not follow any standard. Then we need to free configured the output. Using similar query as before 

select 'bin5' as binID  from Observation((datastream.id.toString() = 'tmp5' and  intResult> 25 ) or ( datastream.id.toString() = 'fill5' and intResult> 50 )).win:time(1 sec)  having count(*)=2

Now we want a flat map structure as an output. The query below, change the output, broker, and will generate the event as a map with properties name the column name.

The full statement below:

 {
  "name": "stinky_bin_route_transform",
  "statement": "select 'bin5' as binID  from Observation((datastream.id.toString() = 'tmp5' and  intResult> 25 ) or ( datastream.id.toString() = 'fill5' and intResult> 50 )).win:time(1 sec)  having count(*)=2",
  "scope": [
    "city"
  ],
  "output": [
    "LS/DPA/1/RAW/0/RAW/stinky_bin"
  ],
  "resultType": "none"
}

To do so, we deploy:

curl -X PUT \
  http://localhost:8319/statement/stinky_bin_route_transform \
  -H 'Content-Type: application/json' \
  -d '{"name": "stinky_bin_route_transform","statement": "select '\''bin5'\'' as binID  from Observation((datastream.id.toString() = '\''tmp5'\'' and  intResult> 25 ) or ( datastream.id.toString() = '\''fill5'\'' and intResult> 50 )).win:time(1 sec)  having count(*)=2","scope":["city"],"output":["LS/DPA/1/RAW/0/RAW/stinky_bin"],
	"resultType":"none"}' -v | jq .

now you can subscribe to the alerts by:

mosquitto_sub -p 1882 -t LS/DPA/1/RAW/0/RAW/stinky_bin

Protocol Translate

Now imagine that the city wants to send all events to another backend which uses rest for the citizen application. To do so, first we need to query all events and select them as they are:

select event from Observation as event

We indicate the alias of the endpoint and API Key Name and the path to send as scope and output respectively. Then we indicate that the output is a REST_POST

In case of REST endpoints, the key name must be given alongside the scope alias separate by ':' (<alias>:<keyName>), i.e., appbackend:post2pub.


The full statement below:

{
  "name": "protocol_translate",
  "statement": "select event from Observation as event",
  "scope": [
    "appbackend:post2pub"
  ],
  "output": [
    "routed"
  ],
  "publisher": "REST_POST"
}
curl -X PUT \
  http://localhost:8319/statement/protocol_translate \
  -H 'Content-Type: application/json' \
  -d '{"name": "protocol_translate" ,"statement": "select event from Observation as event","scope":["appbackend"],"output":["routed"],"publisher":"REST_POST"}' -v | jq .

To be able to show the output the endpoint publish in the weather broker to see the output then subscribe to:

mosquitto_sub -t routed 

Advanced

The agent can 'publish' data using MQTT PUB, REST POST, and REST PUT. Also by REST GET and email, both special cases. REST GET instead of sending an event, every time the query is triggered the agent will GET from an endpoint data and inserting it in the CEP engine. The email is a new feature and is handled differently than the other methods.

  • No labels