Powered By Blogger

May 15, 2014

WSO2 Siddhi


WSO2 CEP is an enterprise grade server which integrates with various systems to analyze meaningful patterns in real world basically financial analysis, fraud detection, sales, shipping and other business events. The core backend runtime engine of the WSO2 CEP Server is WSO2 Siddhi. 

When considering how Siddhi works, basically it receives incoming events in Event Streams through input handlers and then it process them, and finally notifies using callbacks. When concerning the Event, it has a stream ID, time stamp representing the event creation time and an array containing the attributes. 

Initially an event stream needs to be defined specifying its name and its attributes. The attributes should define as a pair of its name and the type. After defining the stream Siddhi creates an input handler which uses to send the defined stream into the system. Also, callbacks can be used to receive notifications when events are produced on the event stream. Finally the event is projected into the outgoing event stream based on the defined outgoing stream attributes.

Some of the basic Siddhi Queries

  • Pass - through
Pass - through query creates an outgoing (output) stream and include the events from the incoming (input) stream as projected.

Following projects are available

1. All
2. Selected attributes

Following are some example which relates with the pass - through Queries

from DataStream 

select *  
insert into DisplayItems;

from DataStream 
select price, quantity
insert into DisplayItems;

  • Filter
Filter query creates an outgoing (output) stream and include the events from the incoming (input) stream which satisfies the conditions defined in the query. The main conditions support by Filters are as follows 


1. >, <, ==, >=, <=, !=
2. contains, instanceof
3. and, or, not


Following are some example which relates with the Filter Queries
from DataStream[price >= 100]select price, quantity
insert into DisplayItems
from DataStream[name instanceof string]

select name,price
insert into DisplayItems 

from DataStream[name contains 's']

select name,price
insert into DisplayItems

from DataStream[price >= 20 and price < 100]

select price,quantity
insert into DisplayItems  


  • Window
Users can define a window and then use the events on the window for calculations. A window has two types of output: current events and expired events. A window shows current events when a new event arrives. Expired events are showed whenever an existing event has expired from a window.

If we want to calculate the quantities sold in the last hour, we can improve the above query by adding a time window.
from DataStream#window.time( 60 min ) 

select name, sum(quantity) as totalQuantity
group by quantity 
insert into DisplayItems 


  • Join
Join considers two streams as input streams where each stream should have an associated window. With the condition this joins the events matches the condition and with the time it joins the events that falls within the time.


Following example shows a join query.


from DataStream#window.time( 60 min ) as ds 
join NewDataStream#window.time( 60 min) as nds
on ds.name == nds.name
insert into DisplayItems ds.name as brandName, ds.price as tradePrice


Here events from the DataStream and NewDataStream will be joined only if the events of DataStream have a name which is equal to the NewDataStream's name

1 comment: