Page tree
Skip to end of metadata
Go to start of metadata

The DP Statement payload

This type of events encapsulate a DP statement used to deploy the query statement on agent. It contains 10 properties of which only 2 (in bold) are mandatory for new statements: ID, NameStatement, Input, Output ,Scope, CEHandler, stateLifeCycle, TargetAgents and isRestOutput. In case of changing an existing statement, the 

Property "ID"

This is NOT mandatory in the payload, but it might be mandatory to be provided it in some manner depending on the API is being used. The ID is unique identifier in an agent. If the the ID is not provided, then is generated by hashing (SHA-256) the Statement value, this means, that two statements that contains the identical statement string will produce the same ID, violating the uniqueness. In case the ID is provided, the ID can be any string that do not violate the uniqueness. It is recommended to use APIs that don't require the ID and let the system manage the IDs by itself. 

Property "Name"

This is the mandatory name of the statement. It is used as handle for statement management and in some cases it identifies a source of generated complex events.

Property "Statement"

This mandatory field contains the query/rule which will be executed by the agent. For Siddhi (WSO2) please visit WSO2 CEP documentation, for EPL (Esper) visit Esper Documentation .

Property "Input" (out of date)

This optional property indicates which topics are needed for the query. Currently this property is handled automatically, but for efficiency reasons it can be defined manually.

Property "Output"

This optional property contains a list of output topics used to publish generated events (in all of them). By default (if not specified) all events are published in the topic "/federation1/amiat/v2/<datastreamID>". The default topic can be overwritten by this property, generating an event per topic of the list, e.g. if the topic is "/my/topic/" and /my/other/topic/" the events will be published in "/my/topic/<streamID>" and /my/other/topic/<streamID>"

Property "Scope"

This optional property indicates in which broker the instance the event will be generated. By default the value is "outgoing", usually means "tcp://localhost:1883".

CEHandler

Define which handler will manage the result of the event. This is an advance feature and it should not be use if you are not 100% sure of what are you doing.

stateLifeCycle

Define the life cycle of the statement. By adding a new statement the value can be:

  • "RUN": Default value. The statement will be deploy and started in the CEP engine. The statement will be active till is removed. 
  • "ONCE": The statement is deploy and activate. Once the statement provide an result will be destroyed.
  • "SYNCHRONOUS":  The statement is deploy and activate. The statement will be run once, similar to ONCE. Additionally, the statement will run and wait (hold) for an response, and return it. 
  • "PAUSE": The statement will be deploy but not started in the CEP engine. The statement will be pause till it status us changed by a request. 

By modifying an statement:

  • "RUN": The statement will be activated
  • "PAUSE": The statement will be paused
  • "REMOVE": The statement will be removeved (WARNING: this value is not valid in a HTTP PUT)

TargetAgents

A list of agent IDs that define wich agents should operate the statement. In case thie list is empty, all agent will actuate in on the statement. 

isRestOutput 

Define that the values given in Scope are HTTP servers aliases, and outputs are HTTP relative URL of thous servers. 


Examples:


Full_bins
{
	"name":"full_bins",
	"statement":"select wasteBin.id as id from Observation.win:time_batch(1 sec).std:unique(id) as wasteBin where (wasteBin.sensor.metadata like '%FillLevelSensor' or wasteBin.sensor.metadata like '%WasteBin') and cast(wasteBin.resultValue, double) > 70.0"
}
persistent_topic
{
	"name":"persistent",
	"statement":"select * from Observation.std:unique(id) output snapshot every 30 seconds",
	"scope":["local"],
    "output":["/federation1/amiat/v2/persistent"]
}

All

Create separate sub set of events for be storage by the SM 

Persistant Topic
select * from Observation.std:unique(id) output snapshot every 30 seconds

NOTES:

  • Change the PI name accordingly
  • Change the scope if is needed

Persistent topic
{
	"name":"persistent",
	"statement":"select * from Observation.std:unique(id) output snapshot every 30 seconds",
	"scope":["local"],
        "output":["/federation1/amiat/v2/persistent"]
}


AMIAT

Full bins alert


Full bins alert
select 
	wasteBin.id as id 
from 
	Observation.win:time_batch(1 sec).std:unique(id) as wasteBin 
where 
	(wasteBin.sensor.metadata like '%FillLevelSensor' or wasteBin.sensor.metadata like '%WasteBin') and 
	cast(wasteBin.resultValue, double) > 70.0



full bins alert payload
{
	"name":"full_bins",
	"statement":"select wasteBin.id as id from Observation.win:time_batch(1 sec).std:unique(id) as wasteBin where (wasteBin.sensor.metadata like '%FillLevelSensor' or wasteBin.sensor.metadata like '%WasteBin') and cast(wasteBin.resultValue, double) > 70.0",
	"scope":["local"]
}

Empty bins alert 


empty bins alert
select 
	wasteBin.id as id 
from 
	Observation.win:time_batch(1 sec).std:unique(id) as wasteBin 
where 
	(wasteBin.sensor.metadata like '%FillLevelSensor' or wasteBin.sensor.metadata like '%WasteBin') and 
	cast(wasteBin.resultValue, double) <= 70.0 



empty bins alert payload
{
	"name":"empty_bins",
	"statement":"select wasteBin.id as id from Observation.win:time_batch(1 sec).std:unique(id) as wasteBin where (wasteBin.sensor.metadata like '%FillLevelSensor' or wasteBin.sensor.metadata like '%WasteBin') and cast(wasteBin.resultValue, double) <= 70.0",
	"scope":["local"]
}

SMAT


Water leakage statements

NOTES:

  • Must be deployed in this order: first leakage_safe and then leakage_triggered
  • The leakage_safe must be bootstrapped, this means, after the deployment of the statements (both) two Observations without leakage must be received by the DFM.
  • The ID of the water tower must be change in case this id change

Leakage Safe


leakage safe
insert into 
	meeter1Safe 
select istream 
	* 
from 
	observation(id = "00a467b9290129a71c6b496813cf52b437d878f25148773494967e2b85a2031b").win:length(2) 
where 
	 prev(resultValue) = resultValue 
	 or
	 prev(resultValue) is null



leakage safe
{
	"name":"leakage_safe",
	"statement":"insert into meeter1Safe select istream * from 	observation(id = "00a467b9290129a71c6b496813cf52b437d878f25148773494967e2b85a2031b").win:length(2) where prev(resultValue) = resultValue or	 prev(resultValue) is null",
	"scope":["local"]
}

Water Leakage triggered


leakage triggered
 select istream 
	 lastNoLeakageMesurment
 from 
	observation(id= '00a467b9290129a71c6b496813cf52b437d878f25148773494967e2b85a2031b').std:lastevent() as meeter, 
	meeter1Safe.win:length(1) as lastNoLeakageMesurment 
 where 
	meeter.phenomenonTime.after(lastNoLeakageMesurment.phenomenonTime, 30 seconds)



leakage triggered
{
	"name":"leakage_triggered",
	"statement":" select istream  lastNoLeakageMesurment from observation(id= '00a467b9290129a71c6b496813cf52b437d878f25148773494967e2b85a2031b').std:lastevent() as meeter, meeter1Safe.win:length(1) as lastNoLeakageMesurment  where 	meeter.phenomenonTime.after(lastNoLeakageMesurment.phenomenonTime, 30 seconds)",
	"scope":["local"]
}
  • No labels