The real-time eventing feature in Coherence is great for relaying state changes to other systems or to users. However, sometimes not all changes need to or can be sent to consumers. For instance;
If rapid changes cannot be consumed or interpreted as fast as they are being sent. A user looking at changing Stock prices may only be able to interpret and react to 1 change per second.
A client may be using low bandwidth connection, so rapidly sending events will only result in them being queued and delayed
A large number of clients may need to be notified of state changes and sending 100 events p/s to 1000 clients cannot be supported with the available hardware, but 10 events p/s to 1000 clients can. Note this example assumes that many of the state changes are to the same value.
One simple approach to throttling Coherence cache events is to use a cache store to capture changes to one cache (data cache) and insert those changes periodically in another cache (events cache). Consumers interested in state changes to entires in the first cache register an interest (event listener) against the second event cache. By using the cache store write-behind feature rapid updates to the same cache entry are coalesced so that updates are merged and written at the interval configured to the event cache. The time interval at which changes are written to the events cache can easily be configured using the write-behind delay time in the cache configuration, as shown below.
<caching-schemes>
<distributed-scheme>
<scheme-name>CustomDistributedCacheScheme</scheme-name>
<service-name>CustomDistributedCacheService</service-name>
<thread-count>1</thread-count>
<backing-map-scheme>
<read-write-backing-map-scheme>
<scheme-name>CustomRWBackingMapScheme</scheme-name>
<internal-cache-scheme>
<local-scheme />
</internal-cache-scheme>
<cachestore-scheme>
<class-scheme>
<scheme-name>CustomCacheStoreScheme</scheme-name>
<class-name>com.oracle.coherence.test.CustomCacheStore</class-name>
<init-params>
<init-param>
<param-type>java.lang.String</param-type>
<param-value>{cache-name}</param-value>
</init-param>
<init-param>
<param-type>java.lang.String</param-type>
<!-- The name of the cache to write events to -->
<param-value>cqc-test</param-value>
</init-param>
</init-params>
</class-scheme>
</cachestore-scheme>
<write-delay>1s</write-delay>
<write-batch-factor>0</write-batch-factor>
</read-write-backing-map-scheme>
</backing-map-scheme>
<autostart>true</autostart>
</distributed-scheme>
</caching-schemes>
The cache store implementation to perform this throttling is trivial and only involves overriding the basic cache store functions.
public class CustomCacheStore implements CacheStore {
private String publishingCacheName;
private String sourceCacheName;
public CustomCacheStore(String sourceCacheStore, String publishingCacheName) {
this.publishingCacheName = publishingCacheName;
this.sourceCacheName = sourceCacheName;
}
@Override
public Object load(Object key) {
return null;
}
@Override
public Map loadAll(Collection keyCollection) {
return null;
}
@Override
public void erase(Object key) {
if (sourceCacheName != publishingCacheName) {
CacheFactory.getCache(publishingCacheName).remove(key);
CacheFactory.log("Erasing entry: " + key, CacheFactory.LOG_DEBUG);
}
}
@Override
public void eraseAll(Collection keyCollection) {
if (sourceCacheName != publishingCacheName) {
for (Object key : keyCollection) {
CacheFactory.getCache(publishingCacheName).remove(key);
CacheFactory.log("Erasing collection entry: " + key,
CacheFactory.LOG_DEBUG);
}
}
}
@Override
public void store(Object key, Object value) {
if (sourceCacheName != publishingCacheName) {
CacheFactory.getCache(publishingCacheName).put(key, value);
CacheFactory.log("Storing entry (key=value): " + key + "=" + value,
CacheFactory.LOG_DEBUG);
}
}
@Override
public void storeAll(Map entryMap) {
if (sourceCacheName != publishingCacheName) {
CacheFactory.getCache(publishingCacheName).putAll(entryMap);
CacheFactory.log("Storing entries: " + entryMap,
CacheFactory.LOG_DEBUG);
}
}
}
As you can see each cache store operation on the data cache results in a similar operation on event cache. This is a very simple pattern which has a lot of additional possibilities, but it also has a few drawbacks you should be aware of:
This event throttling implementation will use additional memory as a duplicate copy of entries held in the data cache need to be held in the events cache too - 2 if the event cache has backups
A data cache may already use a cache store, so a "multiplexing cache store pattern" must also be used to send changes to the existing and throttling cache store.
If you would like to try out this throttling example you can download it here. I hope its useful and let me know if you spot any further optimizations.