Aggregating cache data from OCEP in CQL
- by Manju James
There are several use cases where OCEP applications need to join stream data with external data, such as data available in a Coherence cache. OCEP’s streaming language, CQL, supports simple cache-key based joins of stream data with data in Coherence (more complex queries will be supported in a future release). However, there are instances where you may need to aggregate the data in Coherence based on input data from a stream. This blog describes a sample that does just that.
For our sample, we will use a simplified credit card fraud detection use case. The input to this sample application is a stream of credit card transaction data. The input stream contains information like the credit card ID, transaction time and transaction amount. The purpose of this application is to detect suspicious transactions and send out a warning event. For the sake of simplicity, we will assume that all transactions with amounts greater than $1000 are suspicious. The transaction history is available in a Coherence distributed cache. For every suspicious transaction detected, a warning event must be sent with maximum amount, total amount and total number of transactions over the past 30 days, as shown in the diagram below.
Application Input
Stream input to the EPN contains events of type CCTransactionEvent. This input has to be joined with the cache with all credit card transactions. The cache is configured in the EPN as shown below:
<wlevs:caching-system id="CohCacheSystem" provider="coherence"/>
<wlevs:cache id="CCTransactionsCache" value-type="CCTransactionEvent"
key-properties="cardID, transactionTime"
caching-system="CohCacheSystem">
</wlevs:cache>
Application Output
The output that must be produced by the application is a fraud warning event. This event is configured in the spring file as shown below. Source for cardHistory property can be seen here.
<wlevs:event-type type-name="FraudWarningEvent">
<wlevs:properties type="tuple">
<wlevs:property name="cardID" type="CHAR"/>
<wlevs:property name="transactionTime" type="BIGINT"/>
<wlevs:property name="transactionAmount" type="DOUBLE"/>
<wlevs:property name="cardHistory" type="OBJECT"/>
</wlevs:properties
</wlevs:event-type>
Cache Data Aggregation using Java Cartridge
In the output warning event, cardHistory property contains data from the cache aggregated over the past 30 days. To get this information, we use a java cartridge method. This method uses Coherence’s query API on credit card transactions cache to get the required information. Therefore, the java cartridge method requires a reference to the cache. This may be set up by configuring it in the spring context file as shown below:
<bean class="com.oracle.cep.ccfraud.CCTransactionsAggregator">
<property name="cache" ref="CCTransactionsCache"/>
</bean>
This is used by the java class to set a static property:
public void setCache(Map cache)
{
s_cache = (NamedCache) cache;
}
The code snippet below shows how the total of all the transaction amounts in the past 30 days is computed. Rest of the information required by CardHistory object is calculated in a similar manner. Complete source of this class can be found here. To find out more information about using Coherence's API to query a cache, please refer Coherence Developer’s Guide.
public static CreditHistoryData(String cardID)
{
…
Filter filter = QueryHelper.createFilter("cardID = :cardID and transactionTime :transactionTime", map);
CardHistoryData history = new CardHistoryData();
Double sum = (Double) s_cache.aggregate(filter, new DoubleSum("getTransactionAmount"));
history.setTotalAmount(sum);
…
return history;
}
The java cartridge method is used from CQL as seen below:
select cardID,
transactionTime,
transactionAmount,
CCTransactionsAggregator.execute(cardID) as cardHistory
from inputChannel
where transactionAmount1000
This produces a warning event, with history data, for every credit card transaction over $1000.
That is all there is to it. The complete source for the sample application, along with the configuration files, is available here. In the sample, I use a simple java bean to load the cache with initial transaction history data. An input adapter is used to create and send transaction events for the input stream.