CoherencePushReplicationDB.zipIn
the example bellow I'm describing a way of developing a custom push
replication publisher that publishes data to a database via JDBC.
This example can be easily changed to publish data to other receivers
(JMS,...) by performing changes to step 2 and small changes to step
3, steps that are presented bellow. I've used Eclipse as the
development tool.
To
develop a custom push replication publishers we will need to go
through 6 steps:
Step
1:
Create a custom publisher scheme class
Step
2: Create
a custom publisher class that should define what the publisher is
doing.
Step
3:
Create a class data is performing the actions (publish to JMS, DB,
etc ) for the custom publisher.
Step
4:
Register the new publisher against a ContentHandler.
Step
5: Add
the new custom publisher in the cache configuration file.
Step
6:
Add the custom publisher scheme class to the POF configuration
file.
All
these steps are detailed bellow.
The
coherence project is attached and conclusions are presented at the
end.
Step
1: In the Coherence Eclipse project create a
class called CustomPublisherScheme that should implement
com.oracle.coherence.patterns.pushreplication.publishers.AbstractPublisherScheme.
In this class define the elements of the custom-publisher-scheme
element.
For
instance for a CustomPublisherScheme that looks like that:
<sync:publisher>
<sync:publisher-name>Active2-JDBC-Publisher</sync:publisher-name>
<sync:publisher-scheme>
<sync:custom-publisher-scheme>
<sync:jdbc-string>jdbc:oracle:thin:@machine-name:1521:XE</sync:jdbc-string>
<sync:username>hr</sync:username>
<sync:password>hr</sync:password>
</sync:custom-publisher-scheme>
</sync:publisher-scheme>
</sync:publisher>
the
code is:
package
com.oracle.coherence;
import
java.io.DataInput;
import
java.io.DataOutput;
import
java.io.IOException;
import
com.oracle.coherence.patterns.pushreplication.Publisher;
import
com.oracle.coherence.configuration.Configurable;
import
com.oracle.coherence.configuration.Mandatory;
import
com.oracle.coherence.configuration.Property;
import
com.oracle.coherence.configuration.parameters.ParameterScope;
import
com.oracle.coherence.environment.Environment;
import
com.tangosol.io.pof.PofReader;
import
com.tangosol.io.pof.PofWriter;
import
com.tangosol.util.ExternalizableHelper;
@Configurable
public
class
CustomPublisherScheme
extends
com.oracle.coherence.patterns.pushreplication.publishers.AbstractPublisherScheme
{
/**
*
*/
private
static
final
long
serialVersionUID
= 1L;
private
String jdbcString;
private
String username;
private
String password;
public
String getJdbcString()
{
return
this.jdbcString;
}
@Property("jdbc-string")
@Mandatory
public
void
setJdbcString(String jdbcString)
{
this.jdbcString
= jdbcString;
}
public
String getUsername()
{
return
username;
}
@Property("username")
@Mandatory
public
void
setUsername(String username)
{
this.username
= username;
}
public
String getPassword()
{
return
password;
}
@Property("password")
@Mandatory
public
void
setPassword(String password)
{
this.password
= password;
}
public
Publisher realize(Environment environment, ClassLoader classLoader,
ParameterScope
parameterScope)
{
return
new
CustomPublisher(getJdbcString(), getUsername(),
getPassword());
}
public
void
readExternal(DataInput in) throws
IOException
{
super.readExternal(in);
this.jdbcString
= ExternalizableHelper.readSafeUTF(in);
this.username
= ExternalizableHelper.readSafeUTF(in);
this.password
= ExternalizableHelper.readSafeUTF(in);
}
public
void
writeExternal(DataOutput out) throws
IOException
{
super.writeExternal(out);
ExternalizableHelper.writeSafeUTF(out,
this.jdbcString);
ExternalizableHelper.writeSafeUTF(out,
this.username);
ExternalizableHelper.writeSafeUTF(out,
this.password);
}
public
void
readExternal(PofReader reader) throws
IOException
{
super.readExternal(reader);
this.jdbcString
= reader.readString(100);
this.username
= reader.readString(101);
this.password
= reader.readString(102);
}
public
void
writeExternal(PofWriter writer) throws
IOException
{
super.writeExternal(writer);
writer.writeString(100,
this.jdbcString);
writer.writeString(101,
this.username);
writer.writeString(102,
this.password);
}
}
Step
2:
Define
what the CustomPublisher should basically do by creating a new java
class called CustomPublisher
that implements
com.oracle.coherence.patterns.pushreplication.Publisher
package
com.oracle.coherence;
import
com.oracle.coherence.patterns.pushreplication.EntryOperation;
import
com.oracle.coherence.patterns.pushreplication.Publisher;
import
com.oracle.coherence.patterns.pushreplication.exceptions.PublisherNotReadyException;
import
java.io.BufferedWriter;
import
java.util.Iterator;
public
class
CustomPublisher implements
Publisher
{
private
String jdbcString;
private
String username;
private
String password;
private
transient
BufferedWriter bufferedWriter;
public
CustomPublisher()
{
}
public
CustomPublisher(String jdbcString, String username, String password)
{
this.jdbcString
= jdbcString;
this.username
= username;
this.password
= password;
this.bufferedWriter
= null;
}
public
String getJdbcString()
{
return
this.jdbcString;
}
public
String getUsername()
{
return
username;
}
public
String getPassword()
{
return
password;
}
public
void
publishBatch(String cacheName, String publisherName,
Iterator<EntryOperation>
entryOperations)
{
DatabasePersistence
databasePersistence = new
DatabasePersistence(
jdbcString,
username,
password);
while
(entryOperations.hasNext())
{
EntryOperation
entryOperation = (EntryOperation) entryOperations
.next();
databasePersistence.databasePersist(entryOperation);
}
}
public
void
start(String cacheName, String publisherName)
throws
PublisherNotReadyException
{
System.err
.printf("Started:
Custom JDBC Publisher for Cache %s with Publisher %s\n",
new
Object[] { cacheName, publisherName });
}
public
void
stop(String cacheName, String publisherName)
{
System.err
.printf("Stopped:
Custom JDBC Publisher for Cache %s with Publisher %s\n",
new
Object[] { cacheName, publisherName });
}
}
In
the publishBatch
method from above we inform the publisher that he is supposed
to persist data to a database:
DatabasePersistence
databasePersistence = new
DatabasePersistence(
jdbcString,
username,
password);
while
(entryOperations.hasNext())
{
EntryOperation
entryOperation = (EntryOperation) entryOperations
.next();
databasePersistence.databasePersist(entryOperation);
}
Step
3:
The
class that deals with the persistence is a very basic one that uses
JDBC to perform inserts/updates against a database.
package
com.oracle.coherence;
import
com.oracle.coherence.patterns.pushreplication.EntryOperation;
import
java.sql.*;
import
java.text.SimpleDateFormat;
import
com.oracle.coherence.Order;
public
class
DatabasePersistence
{
public
static
String INSERT_OPERATION
= "INSERT";
public
static
String UPDATE_OPERATION
= "UPDATE";
public
Connection dbConnection;
public
DatabasePersistence(String jdbcString, String username,
String
password)
{
this.dbConnection
= createConnection(jdbcString, username, password);
}
public
Connection createConnection(String jdbcString, String username,
String
password)
{
Connection
connection = null;
System.err.println("Connecting
to: " + jdbcString + "
Username: "
+
username + " Password: "
+ password);
try
{
//
Load the JDBC driver
String
driverName = "oracle.jdbc.driver.OracleDriver";
Class.forName(driverName);
//
Create a connection to the database
connection
= DriverManager.getConnection(jdbcString, username,
password);
System.err.println("Connected
to:" + jdbcString + "
Username: "
+
username + " Password: "
+ password);
}
catch
(ClassNotFoundException e)
{
e.printStackTrace();
}
//
driver
catch
(SQLException e)
{
e.printStackTrace();
}
return
connection;
}
public
void
databasePersist(EntryOperation entryOperation)
{
if
(entryOperation.getOperation().toString()
.equalsIgnoreCase(INSERT_OPERATION))
{
insert(((Order)
entryOperation.getPublishableEntry().getValue()));
}
else
if
(entryOperation.getOperation().toString()
.equalsIgnoreCase(UPDATE_OPERATION))
{
update(((Order)
entryOperation.getPublishableEntry().getValue()));
}
}
public
void
update(Order order)
{
String
update = "UPDATE Orders set
QUANTITY= '"
+
order.getQuantity()
+
"', AMOUNT='"
+
order.getAmount()
+
"', ORD_DATE= '"
+
(new
SimpleDateFormat("dd-MMM-yyyy")).format(order
.getOrdDate())
+ "' WHERE SYMBOL='"
+ order.getSymbol()
+
"'";
System.err.println("UPDATE
= " + update);
try
{
Statement
stmt = getDbConnection().createStatement();
stmt.execute(update);
stmt.close();
}
catch
(SQLException ex)
{
System.err.println("SQLException:
" + ex.getMessage());
}
}
public
void
insert(Order order)
{
String
insert = "insert into Orders
values('"
+
order.getSymbol()
+
"',"
+
order.getQuantity()
+
","
+
order.getAmount()
+
",'"
+
(new
SimpleDateFormat("dd-MMM-yyyy")).format(order
.getOrdDate())
+ "')";
System.err.println("INSERT
= " + insert);
try
{
Statement
stmt = getDbConnection().createStatement();
stmt.execute(insert);
stmt.close();
}
catch
(SQLException ex)
{
System.err.println("SQLException:
" + ex.getMessage());
}
}
public
Connection getDbConnection()
{
return
dbConnection;
}
public
void
setDbConnection(Connection dbConnection)
{
this.dbConnection
= dbConnection;
}
}
Step
4:
Now we need to register our publisher against a ContentHandler.
In order to achieve that we need to create in our eclipse project a
new class called CustomPushReplicationNamespaceContentHandler
that should extend the
com.oracle.coherence.patterns.pushreplication.configuration.PushReplicationNamespaceContentHandler.
In the constructor of the new class we define a new handler for our
custom
publisher.
package
com.oracle.coherence;
import
com.oracle.coherence.configuration.Configurator;
import
com.oracle.coherence.environment.extensible.ConfigurationContext;
import
com.oracle.coherence.environment.extensible.ConfigurationException;
import
com.oracle.coherence.environment.extensible.ElementContentHandler;
import
com.oracle.coherence.patterns.pushreplication.PublisherScheme;
import
com.oracle.coherence.environment.extensible.QualifiedName;
import
com.oracle.coherence.patterns.pushreplication.configuration.PushReplicationNamespaceContentHandler;
import
com.tangosol.run.xml.XmlElement;
public
class
CustomPushReplicationNamespaceContentHandler extends
PushReplicationNamespaceContentHandler
{
public
CustomPushReplicationNamespaceContentHandler()
{
super();
registerContentHandler("custom-publisher-scheme",
new
ElementContentHandler()
{
public
Object onElement(ConfigurationContext context, QualifiedName
qualifiedName, XmlElement xmlElement)
throws
ConfigurationException
{
PublisherScheme
publisherScheme = new
CustomPublisherScheme();
Configurator.configure(publisherScheme, context,
qualifiedName, xmlElement);
return
publisherScheme;
}
});
}
}
Step
5:
Now we should define our CustomPublisher
in the cache configuration file according to the following
documentation.
<cache-config
xmlns:sync="class:com.oracle.coherence.CustomPushReplicationNamespaceContentHandler"
xmlns:cr="class:com.oracle.coherence.environment.extensible.namespaces.InstanceNamespaceContentHandler">
<caching-schemes>
<sync:provider
pof-enabled="false">
<sync:coherence-provider
/>
</sync:provider>
<caching-scheme-mapping>
<cache-mapping>
<cache-name>publishing-cache</cache-name>
<scheme-name>distributed-scheme-with-publishing-cachestore</scheme-name>
<autostart>true</autostart>
<sync:publisher>
<sync:publisher-name>Active2
Publisher</sync:publisher-name>
<sync:publisher-scheme>
<sync:remote-cluster-publisher-scheme>
<sync:remote-invocation-service-name>remote-site1</sync:remote-invocation-service-name>
<sync:remote-publisher-scheme>
<sync:local-cache-publisher-scheme>
<sync:target-cache-name>publishing-cache</sync:target-cache-name>
</sync:local-cache-publisher-scheme>
</sync:remote-publisher-scheme>
<sync:autostart>true</sync:autostart>
</sync:remote-cluster-publisher-scheme>
</sync:publisher-scheme>
</sync:publisher>
<sync:publisher>
<sync:publisher-name>Active2-Output-Publisher</sync:publisher-name>
<sync:publisher-scheme>
<sync:stderr-publisher-scheme>
<sync:autostart>true</sync:autostart>
<sync:publish-original-value>true</sync:publish-original-value>
</sync:stderr-publisher-scheme>
</sync:publisher-scheme>
</sync:publisher>
<sync:publisher>
<sync:publisher-name>Active2-JDBC-Publisher</sync:publisher-name>
<sync:publisher-scheme>
<sync:custom-publisher-scheme>
<sync:jdbc-string>jdbc:oracle:thin:@machine_name:1521:XE</sync:jdbc-string>
<sync:username>hr</sync:username>
<sync:password>hr</sync:password>
</sync:custom-publisher-scheme>
</sync:publisher-scheme>
</sync:publisher>
</cache-mapping>
</caching-scheme-mapping>
<!--
The following scheme is required for each remote-site when using a
RemoteInvocationPublisher
-->
<remote-invocation-scheme>
<service-name>remote-site1</service-name>
<initiator-config>
<tcp-initiator>
<remote-addresses>
<socket-address>
<address>localhost</address>
<port>20001</port>
</socket-address>
</remote-addresses>
<connect-timeout>2s</connect-timeout>
</tcp-initiator>
<outgoing-message-handler>
<request-timeout>5s</request-timeout>
</outgoing-message-handler>
</initiator-config>
</remote-invocation-scheme>
<!--
END: com.oracle.coherence.patterns.pushreplication -->
<proxy-scheme>
<service-name>ExtendTcpProxyService</service-name>
<acceptor-config>
<tcp-acceptor>
<local-address>
<address>localhost</address>
<port>20002</port>
</local-address>
</tcp-acceptor>
</acceptor-config>
<autostart>true</autostart>
</proxy-scheme>
</caching-schemes>
</cache-config>
As
you can see in the red-marked text from above I've:
- set new Namespace Content Handler
- define the new custom publisher that should work together with
other publishers like: stderr and remote publishers in our case.
Step
6:
Add
the com.oracle.coherence.CustomPublisherScheme
to your custom-pof-config file:
<pof-config>
<user-type-list>
<!--
Built in types -->
<include>coherence-pof-config.xml</include>
<include>coherence-common-pof-config.xml</include>
<include>coherence-messagingpattern-pof-config.xml</include>
<include>coherence-pushreplicationpattern-pof-config.xml</include>
<!--
Application types -->
<user-type>
<type-id>1901</type-id>
<class-name>com.oracle.coherence.Order</class-name>
<serializer>
<class-name>com.oracle.coherence.OrderSerializer</class-name>
</serializer>
</user-type>
<user-type>
<type-id>1902</type-id>
<class-name>com.oracle.coherence.CustomPublisherScheme</class-name>
</user-type>
</user-type-list>
</pof-config>
CONCLUSIONSThis
approach allows for publishers to publish data to almost any other
receiver (database, JMS, MQ, ...). The only thing that needs to be
changed is the
DatabasePersistence.java
class that should be adapted to the chosen receiver. Only minor
changes are needed for the rest of the code (to publishBatch
method
from CustomPublisher
class).