Coherence - How to develop a custom push replication publisher
Posted
by cosmin.tudor(at)oracle.com
on Oracle Blogs
See other posts from Oracle Blogs
or by cosmin.tudor(at)oracle.com
Published on Thu, 13 Jan 2011 08:25:34 +0000
Indexed on
2011/01/13
10:56 UTC
Read the original article
Hit count: 748
CoherencePushReplicationDB.zip
In the example bellow I'm describing a way of developing a custom push replication publisher that publishes data to a database via JDBC. This example can be easily changed to publish data to other receivers (JMS,...) by performing changes to step 2 and small changes to step 3, steps that are presented bellow. I've used Eclipse as the development tool.
To develop a custom push replication publishers we will need to go through 6 steps:
Step 1: Create a custom publisher scheme class
Step 2: Create a custom publisher class that should define what the publisher is doing.
Step 3: Create a class data is performing the actions (publish to JMS, DB, etc ) for the custom publisher.
Step 4: Register the new publisher against a ContentHandler.
Step 5: Add the new custom publisher in the cache configuration file.
Step 6: Add the custom publisher scheme class to the POF configuration file.
All these steps are detailed bellow.
The coherence project is attached and conclusions are presented at the end.
Step 1: In the Coherence Eclipse project create a class called CustomPublisherScheme that should implement com.oracle.coherence.patterns.pushreplication.publishers.AbstractPublisherScheme. In this class define the elements of the custom-publisher-scheme element.
For instance for a CustomPublisherScheme that looks like that:
<sync:publisher>
<sync:publisher-name>Active2-JDBC-Publisher</sync:publisher-name>
<sync:publisher-scheme>
<sync:custom-publisher-scheme>
<sync:jdbc-string>jdbc:oracle:thin:@machine-name:1521:XE</sync:jdbc-string>
<sync:username>hr</sync:username>
<sync:password>hr</sync:password>
</sync:custom-publisher-scheme>
</sync:publisher-scheme>
</sync:publisher>
the code is:
package com.oracle.coherence;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.oracle.coherence.patterns.pushreplication.Publisher;
import com.oracle.coherence.configuration.Configurable;
import com.oracle.coherence.configuration.Mandatory;
import com.oracle.coherence.configuration.Property;
import com.oracle.coherence.configuration.parameters.ParameterScope;
import com.oracle.coherence.environment.Environment;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.util.ExternalizableHelper;
@Configurable
public class CustomPublisherScheme
extends
com.oracle.coherence.patterns.pushreplication.publishers.AbstractPublisherScheme
{
/**
*
*/
private static final long serialVersionUID = 1L;
private String jdbcString;
private String username;
private String password;
public String getJdbcString()
{
return this.jdbcString;
}
@Property("jdbc-string")
@Mandatory
public void setJdbcString(String jdbcString)
{
this.jdbcString = jdbcString;
}
public String getUsername()
{
return username;
}
@Property("username")
@Mandatory
public void setUsername(String username)
{
this.username = username;
}
public String getPassword()
{
return password;
}
@Property("password")
@Mandatory
public void setPassword(String password)
{
this.password = password;
}
public Publisher realize(Environment environment, ClassLoader classLoader,
ParameterScope parameterScope)
{
return new CustomPublisher(getJdbcString(), getUsername(),
getPassword());
}
public void readExternal(DataInput in) throws IOException
{
super.readExternal(in);
this.jdbcString = ExternalizableHelper.readSafeUTF(in);
this.username = ExternalizableHelper.readSafeUTF(in);
this.password = ExternalizableHelper.readSafeUTF(in);
}
public void writeExternal(DataOutput out) throws IOException
{
super.writeExternal(out);
ExternalizableHelper.writeSafeUTF(out, this.jdbcString);
ExternalizableHelper.writeSafeUTF(out, this.username);
ExternalizableHelper.writeSafeUTF(out, this.password);
}
public void readExternal(PofReader reader) throws IOException
{
super.readExternal(reader);
this.jdbcString = reader.readString(100);
this.username = reader.readString(101);
this.password = reader.readString(102);
}
public void writeExternal(PofWriter writer) throws IOException
{
super.writeExternal(writer);
writer.writeString(100, this.jdbcString);
writer.writeString(101, this.username);
writer.writeString(102, this.password);
}
}
Step 2: Define what the CustomPublisher should basically do by creating a new java class called CustomPublisher that implements com.oracle.coherence.patterns.pushreplication.Publisher
package com.oracle.coherence;
import com.oracle.coherence.patterns.pushreplication.EntryOperation;
import com.oracle.coherence.patterns.pushreplication.Publisher;
import com.oracle.coherence.patterns.pushreplication.exceptions.PublisherNotReadyException;
import java.io.BufferedWriter;
import java.util.Iterator;
public class CustomPublisher implements Publisher
{
private String jdbcString;
private String username;
private String password;
private transient BufferedWriter bufferedWriter;
public CustomPublisher()
{
}
public CustomPublisher(String jdbcString, String username, String password)
{
this.jdbcString = jdbcString;
this.username = username;
this.password = password;
this.bufferedWriter = null;
}
public String getJdbcString()
{
return this.jdbcString;
}
public String getUsername()
{
return username;
}
public String getPassword()
{
return password;
}
public void publishBatch(String cacheName, String publisherName,
Iterator<EntryOperation> entryOperations)
{
DatabasePersistence databasePersistence = new DatabasePersistence(
jdbcString, username, password);
while (entryOperations.hasNext())
{
EntryOperation entryOperation = (EntryOperation) entryOperations
.next();
databasePersistence.databasePersist(entryOperation);
}
}
public void start(String cacheName, String publisherName)
throws PublisherNotReadyException
{
System.err
.printf("Started: Custom JDBC Publisher for Cache %s with Publisher %s\n",
new Object[] { cacheName, publisherName });
}
public void stop(String cacheName, String publisherName)
{
System.err
.printf("Stopped: Custom JDBC Publisher for Cache %s with Publisher %s\n",
new Object[] { cacheName, publisherName });
}
}
In the publishBatch method from above we inform the publisher that he is supposed to persist data to a database:
DatabasePersistence databasePersistence = new DatabasePersistence(
jdbcString, username, password);
while (entryOperations.hasNext())
{
EntryOperation entryOperation = (EntryOperation) entryOperations
.next();
databasePersistence.databasePersist(entryOperation);
}
Step 3: The class that deals with the persistence is a very basic one that uses JDBC to perform inserts/updates against a database.
package com.oracle.coherence;
import com.oracle.coherence.patterns.pushreplication.EntryOperation;
import java.sql.*;
import java.text.SimpleDateFormat;
import com.oracle.coherence.Order;
public class DatabasePersistence
{
public static String INSERT_OPERATION = "INSERT";
public static String UPDATE_OPERATION = "UPDATE";
public Connection dbConnection;
public DatabasePersistence(String jdbcString, String username,
String password)
{
this.dbConnection = createConnection(jdbcString, username, password);
}
public Connection createConnection(String jdbcString, String username,
String password)
{
Connection connection = null;
System.err.println("Connecting to: " + jdbcString + " Username: "
+ username + " Password: " + password);
try
{
// Load the JDBC driver
String driverName = "oracle.jdbc.driver.OracleDriver";
Class.forName(driverName);
// Create a connection to the database
connection = DriverManager.getConnection(jdbcString, username,
password);
System.err.println("Connected to:" + jdbcString + " Username: "
+ username + " Password: " + password);
} catch (ClassNotFoundException e)
{
e.printStackTrace();
}
// driver
catch (SQLException e)
{
e.printStackTrace();
}
return connection;
}
public void databasePersist(EntryOperation entryOperation)
{
if (entryOperation.getOperation().toString()
.equalsIgnoreCase(INSERT_OPERATION))
{
insert(((Order) entryOperation.getPublishableEntry().getValue()));
} else if (entryOperation.getOperation().toString()
.equalsIgnoreCase(UPDATE_OPERATION))
{
update(((Order) entryOperation.getPublishableEntry().getValue()));
}
}
public void update(Order order)
{
String update = "UPDATE Orders set QUANTITY= '"
+ order.getQuantity()
+ "', AMOUNT='"
+ order.getAmount()
+ "', ORD_DATE= '"
+ (new SimpleDateFormat("dd-MMM-yyyy")).format(order
.getOrdDate()) + "' WHERE SYMBOL='" + order.getSymbol()
+ "'";
System.err.println("UPDATE = " + update);
try
{
Statement stmt = getDbConnection().createStatement();
stmt.execute(update);
stmt.close();
} catch (SQLException ex)
{
System.err.println("SQLException: " + ex.getMessage());
}
}
public void insert(Order order)
{
String insert = "insert into Orders values('"
+ order.getSymbol()
+ "',"
+ order.getQuantity()
+ ","
+ order.getAmount()
+ ",'"
+ (new SimpleDateFormat("dd-MMM-yyyy")).format(order
.getOrdDate()) + "')";
System.err.println("INSERT = " + insert);
try
{
Statement stmt = getDbConnection().createStatement();
stmt.execute(insert);
stmt.close();
} catch (SQLException ex)
{
System.err.println("SQLException: " + ex.getMessage());
}
}
public Connection getDbConnection()
{
return dbConnection;
}
public void setDbConnection(Connection dbConnection)
{
this.dbConnection = dbConnection;
}
}
Step 4: Now we need to register our publisher against a ContentHandler. In order to achieve that we need to create in our eclipse project a new class called CustomPushReplicationNamespaceContentHandler that should extend the com.oracle.coherence.patterns.pushreplication.configuration.PushReplicationNamespaceContentHandler. In the constructor of the new class we define a new handler for our custom publisher.
package com.oracle.coherence;
import com.oracle.coherence.configuration.Configurator;
import com.oracle.coherence.environment.extensible.ConfigurationContext;
import com.oracle.coherence.environment.extensible.ConfigurationException;
import com.oracle.coherence.environment.extensible.ElementContentHandler;
import com.oracle.coherence.patterns.pushreplication.PublisherScheme;
import com.oracle.coherence.environment.extensible.QualifiedName;
import com.oracle.coherence.patterns.pushreplication.configuration.PushReplicationNamespaceContentHandler;
import com.tangosol.run.xml.XmlElement;
public class CustomPushReplicationNamespaceContentHandler extends PushReplicationNamespaceContentHandler
{
public CustomPushReplicationNamespaceContentHandler()
{
super();
registerContentHandler("custom-publisher-scheme", new ElementContentHandler()
{
public Object onElement(ConfigurationContext context, QualifiedName qualifiedName, XmlElement xmlElement)
throws ConfigurationException
{
PublisherScheme publisherScheme = new CustomPublisherScheme();
Configurator.configure(publisherScheme, context, qualifiedName, xmlElement);
return publisherScheme;
}
});
}
}
Step 5: Now we should define our CustomPublisher in the cache configuration file according to the following documentation.
<cache-config
xmlns:sync="class:com.oracle.coherence.CustomPushReplicationNamespaceContentHandler"
xmlns:cr="class:com.oracle.coherence.environment.extensible.namespaces.InstanceNamespaceContentHandler">
<caching-schemes>
<sync:provider pof-enabled="false">
<sync:coherence-provider />
</sync:provider>
<caching-scheme-mapping>
<cache-mapping>
<cache-name>publishing-cache</cache-name>
<scheme-name>distributed-scheme-with-publishing-cachestore</scheme-name>
<autostart>true</autostart>
<sync:publisher>
<sync:publisher-name>Active2 Publisher</sync:publisher-name>
<sync:publisher-scheme>
<sync:remote-cluster-publisher-scheme>
<sync:remote-invocation-service-name>remote-site1</sync:remote-invocation-service-name>
<sync:remote-publisher-scheme>
<sync:local-cache-publisher-scheme>
<sync:target-cache-name>publishing-cache</sync:target-cache-name>
</sync:local-cache-publisher-scheme>
</sync:remote-publisher-scheme>
<sync:autostart>true</sync:autostart>
</sync:remote-cluster-publisher-scheme>
</sync:publisher-scheme>
</sync:publisher>
<sync:publisher>
<sync:publisher-name>Active2-Output-Publisher</sync:publisher-name>
<sync:publisher-scheme>
<sync:stderr-publisher-scheme>
<sync:autostart>true</sync:autostart>
<sync:publish-original-value>true</sync:publish-original-value>
</sync:stderr-publisher-scheme>
</sync:publisher-scheme>
</sync:publisher>
<sync:publisher>
<sync:publisher-name>Active2-JDBC-Publisher</sync:publisher-name>
<sync:publisher-scheme>
<sync:custom-publisher-scheme>
<sync:jdbc-string>jdbc:oracle:thin:@machine_name:1521:XE</sync:jdbc-string>
<sync:username>hr</sync:username>
<sync:password>hr</sync:password>
</sync:custom-publisher-scheme>
</sync:publisher-scheme>
</sync:publisher>
</cache-mapping>
</caching-scheme-mapping>
<!-- The following scheme is required for each remote-site when using a
RemoteInvocationPublisher -->
<remote-invocation-scheme>
<service-name>remote-site1</service-name>
<initiator-config>
<tcp-initiator>
<remote-addresses>
<socket-address>
<address>localhost</address>
<port>20001</port>
</socket-address>
</remote-addresses>
<connect-timeout>2s</connect-timeout>
</tcp-initiator>
<outgoing-message-handler>
<request-timeout>5s</request-timeout>
</outgoing-message-handler>
</initiator-config>
</remote-invocation-scheme>
<!-- END: com.oracle.coherence.patterns.pushreplication -->
<proxy-scheme>
<service-name>ExtendTcpProxyService</service-name>
<acceptor-config>
<tcp-acceptor>
<local-address>
<address>localhost</address>
<port>20002</port>
</local-address>
</tcp-acceptor>
</acceptor-config>
<autostart>true</autostart>
</proxy-scheme>
</caching-schemes>
</cache-config>
As
you can see in the red-marked text from above I've:
- set new Namespace Content Handler
- define the new custom publisher that should work together with
other publishers like: stderr and remote publishers in our case.
Step 6: Add the com.oracle.coherence.CustomPublisherScheme to your custom-pof-config file:
<pof-config>
<user-type-list>
<!-- Built in types -->
<include>coherence-pof-config.xml</include>
<include>coherence-common-pof-config.xml</include>
<include>coherence-messagingpattern-pof-config.xml</include>
<include>coherence-pushreplicationpattern-pof-config.xml</include>
<!-- Application types -->
<user-type>
<type-id>1901</type-id>
<class-name>com.oracle.coherence.Order</class-name>
<serializer>
<class-name>com.oracle.coherence.OrderSerializer</class-name>
</serializer>
</user-type>
<user-type>
<type-id>1902</type-id>
<class-name>com.oracle.coherence.CustomPublisherScheme</class-name>
</user-type>
</user-type-list>
</pof-config>
CONCLUSIONS
This
approach allows for publishers to publish data to almost any other
receiver (database, JMS, MQ, ...). The only thing that needs to be
changed is the
DatabasePersistence.java
class that should be adapted to the chosen receiver. Only minor
changes are needed for the rest of the code (to publishBatch
method
from CustomPublisher
class).
© Oracle Blogs or respective owner