Our example program builds from the simple transactional application you created in the Transactional Application chapter and configures write forwarding. The application is network-aware, so you can specify things like host names and ports from the command line. This program has additional error handling for replication errors.
When using replication with write forwarding, there are several benefits for your application code:
You do not need to create an event handler to detect changes of the master.
You do not need to use app_data to track whether the current site is master.
You do not need to provide an error for put operations on the client.
Before we begin, we present a class that we will use to maintain useful information for us.
The class that we create is called
RepWrforConfig
.
First, we provide some declarations and
definitions that are needed later in
our example. One is the size of our cache,
which we keep deliberately small for this example, and the other
is the name of our database. Also, you can define a sleep
time, which sets the time that a site waits before it retries
synchronizing with the master. We also provide a global variable that
is the name of our program; this is used for error reporting
later on.
package db.repquote_gsg; public class RepWrforConfig { // Constant values used in the RepQuote application. public static final String progname = "RepQuoteWrforExampleGSG"; public static final int CACHESIZE = 10 * 1024 * 1024; public static final int SLEEPTIME = 5000; // Member variables containing configuration information. // String specifying the home directory for rep files. public String home; // Stores an optional set of "other" hosts. private Vector otherHosts; // Priority within the replication group. public ReplicationManagerStartPolicy startPolicy; // The host address to listen to. private ReplicationManagerSiteConfig thisHost; // Member variables used internally. private int currOtherHost; private boolean gotListenAddress; public RepWrforConfig() { startPolicy = ReplicationManagerStartPolicy.REP_ELECTION; home = "TESTDIR"; gotListenAddress = false; currOtherHost = 0; thisHost = new ReplicationManagerSiteConfig(); otherHosts = new Vector(); } public java.io.File getHome() { return new java.io.File(home); } }
Our example will
consist of a class,
RepQuoteWrforExampleGSG
, that performs
all our work for us.
First, we provide the package declaration and then a few import statements that the class needs.
package db.repquote_gsg; import java.io.FileNotFoundException; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.lang.Thread; import java.lang.InterruptedException; import com.sleepycat.db.Cursor; import com.sleepycat.db.Database; import com.sleepycat.db.DatabaseConfig; import com.sleepycat.db.DatabaseEntry; import com.sleepycat.db.DatabaseException; import com.sleepycat.db.DeadlockException; import com.sleepycat.db.DatabaseType; import com.sleepycat.db.Environment; import com.sleepycat.db.EnvironmentConfig; import com.sleepycat.db.LockMode; import com.sleepycat.db.OperationStatus; import com.sleepycat.db.ReplicationConfig; import com.sleepycat.db.ReplicationHandleDeadException; import com.sleepycat.db.ReplicationHostAddress; import com.sleepycat.db.ReplicationManagerSiteConfig; import com.sleepycat.db.ReplicationManagerAckPolicy; import db.repquote_gsg.RepWrforConfig; public class RepQuoteWrforExampleGSG { private RepWrforConfig repConfig; private Environment dbenv;
Next, we provide our class constructor. This simply initializes our class data members.
public RepQuoteWrforExampleGSG() throws DatabaseException { repConfig = null; dbenv = null; }
And then we provide our usage()
method:
public static void usage() { System.err.println("usage: " + RepWrforConfig.progname); System.err.println("-h home -l|-L host:port " + "[-r host:port]"); System.err.println("\t -h home directory (required)\n" + "\t -l host:port (required unless -L is specified;" + " l stands for local)\n" + "\t -L host:port (optional, L means group creator)\n" + "\t -r host:port (optional; r stands for remote; any " + "number of these\n" + "\t may be specified)\n"); System.exit(1); }
where:
-h
Identifies the environment home directory. You must specify this option.
-l
Identifies the host and port used by this site. You
must specify this option unless -L
is
specified.
-L
Identifies the local site as group creator. You must
specify this option unless -l
is
specified.
-r
Optionally identifies another site participating in this replication group.
Having implemented our
usage()
method, we can jump directly into our
main()
method. This method begins by instantiating a
RepWrforConfig
object, and
then collecting the command line arguments so
that it can populate the object with the
appropriate data:
public static void main(String[] argv) throws Exception { RepWrforConfig config = new RepWrforConfig(); boolean isCreator = false; int tmpPort = 0; // Extract the command line parameters for (int i = 0; i < argv.length; i++) { if (argv[i].compareTo("-h") == 0) { // home is a string arg. i++; config.home = argv[i]; } else if (argv[i].compareTo("-l") == 0 || argv[i].compareTo("-L") == 0) { if (i == argv.length - 1) usage(); if (argv[i].compareTo("-L") == 0) isCreator = true; // "local" should be host:port. i++; // Look for index of the last colon in the argv[i] string. int sep = argv[i].lastIndexOf(':'); if (sep == -1 || sep == 0) { System.err.println( "Invalid local host specification host:port needed."); usage(); } try { tmpPort = Integer.parseInt(argv[i].substring(sep + 1)); } catch (NumberFormatException nfe) { System.err.println("Invalid local host specification, " + "could not parse port number."); usage(); } config.setThisHost(argv[i].substring(0, sep), tmpPort, isCreator); } else if (argv[i].compareTo("-r") == 0) { i++; // Look for index of the last colon in the argv[i] string. int sep = argv[i].lastIndexOf(':'); if (sep == -1 || sep == 0) { System.err.println( "Invalid remote host specification host:port needed."); usage(); } try { tmpPort = Integer.parseInt(argv[i].substring(sep + 1)); } catch (NumberFormatException nfe) { System.err.println("Invalid remote host specification, " + "could not parse port number."); usage(); } config.addOtherHost(argv[i].substring(0, sep), tmpPort); } else { System.err.println("Unrecognized option: " + argv[i]); usage(); } }
And then perform a little sanity checking on the command line input:
// Error check command line. if ((!config.gotListenAddress()) || config.home.length() == 0) usage();
Now we perform the class' work. To begin, we initialize the
object. The init()
method actually
opens our environment for us (shown in the next section).
RepQuoteWrforExampleGSG runner = null; try { runner = new RepQuoteWrforExampleGSG(); runner.init(config);
And then we call our doloop()
method. This method is where we perform all our database
activity.
runner.doloop();
And then, finally terminate the application (which closes our
environment handle) and end the method. Note,
again, that in a traditional transactional application all
databases would be closed here. In our replicated application,
the database will usually be closed in the doloop()
function, but we also conditionally close the database here to handle
some error cases.
runner.terminate(); } catch (DatabaseException dbe) { System.err.println("Caught an exception during " + "initialization or processing: " + dbe.toString()); if (runner != null) runner.terminate(); } System.exit(0); } // end main
The RepQuoteWrforExampleGSG.init()
method is used to open our environment handle.
You can now configure and start Replication Manager with
write forwarding. To configure write forwarding, use
setReplicationConfig
with the
ReplicationConfig.FORWARD_WRITES
option:
public int init(RepWrforConfig config) throws DatabaseException { int ret = 0; repConfig = config; EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setErrorStream(System.err); envConfig.setErrorPrefix(RepWrforConfig.progname); envConfig.addReplicationManagerSite(repConfig.getThisHost()); for (ReplicationHostAddress host = repConfig.getFirstOtherHost(); host != null; host = repConfig.getNextOtherHost()){ ReplicationManagerSiteConfig repmgrRemoteSiteConfig = new ReplicationManagerSiteConfig(host.host, host.port); repmgrRemoteSiteConfig.setBootstrapHelper(true); envConfig.addReplicationManagerSite( repmgrRemoteSiteConfig); } envConfig.setCacheSize(RepWrforConfig.CACHESIZE); envConfig.setTxnNoSync(true); envConfig.setAllowCreate(true); envConfig.setRunRecovery(true); envConfig.setThreaded(true); envConfig.setInitializeReplication(true); envConfig.setInitializeLocking(true); envConfig.setInitializeLogging(true); envConfig.setInitializeCache(true); envConfig.setTransactional(true); try { dbenv = new Environment(repConfig.getHome(), envConfig); } catch(FileNotFoundException e) { System.err.println("FileNotFound exception: " + e.toString()); System.err.println( "Ensure that the environment directory is pre-created."); ret = 1; } // Configure Replication Manager write forwarding. dbenv.setReplicationConfig(ReplicationConfig.FORWARD_WRITES, true); // Start Replication Manager. dbenv.replicationManagerStart(3, repConfig.startPolicy); return ret; }
We now implement our application's primary data processing method. This method provides a command prompt at which the user can enter a stock ticker value and a price for that value. This information is then entered to the database.
To display the database, simply enter
return
at the prompt.
To begin, we declare a database pointer:
public int doloop() throws DatabaseException { Database db = null;
Next, we begin the loop and we immediately open our database if it has not already been opened.
If -L
is set, it specifies the need to
create the database for the initial group creator startup. The
database will be replicated to the other sites when they first
start up. The database will already exist on each site for
subsequent startups.
Note that there is some logic for a site to retry in case it
needs time to synchronize with the master using
SLEEPTIME
.
for (;;) { if (db == null) { DatabaseConfig dbconf = new DatabaseConfig(); dbconf.setType(DatabaseType.BTREE); if (repConfig.getThisHost().getGroupCreator()) { dbconf.setAllowCreate(true); } dbconf.setTransactional(true); try { db = dbenv.openDatabase (null, RepWrforConfig.progname, null, dbconf); } catch (java.io.FileNotFoundException e) { System.err.println("No stock database available yet."); if (db != null) { db.close(true); db = null; } try { Thread.sleep(RepWrforConfig.SLEEPTIME); } catch (InterruptedException ie) {} continue; } }
Now we implement our command prompt.
If the user enters the keywords exit
or quit
, the loop is exited and the
application ends. If the user enters nothing and instead simply
presses return
, the entire contents of the
database is displayed. We use our
printStocks()
method to display the
database. (That implementation is shown next in this chapter.)
We also now check for a dead replication handle, which can occur in rare cases when a new master causes a previously committed transaction to be rolled back. In such cases, all database handles must be closed and opened again.
Remember that very little error checking is performed on the data entered at this prompt. If the user fails to enter at least one space in the value string, a simple help message is printed and the prompt is returned to the user.
BufferedReader stdin = new BufferedReader(new InputStreamReader(System.in)); // Listen for input, and add it to the database. System.out.print("QUOTESERVER"); System.out.print("> "); System.out.flush(); String nextline = null; try { nextline = stdin.readLine(); } catch (IOException ioe) { System.err.println("Unable to get data from stdin"); break; } String[] words = nextline.split("\\s"); // A blank line causes the DB to be dumped to stdout. if (words.length == 0 || (words.length == 1 && words[0].length() == 0)) { try { printStocks(db); } catch (DeadlockException de) { continue; // Dead replication handles are caused by an election // resulting in a previously committing read becoming // invalid. Close the db handle and reopen. } catch (ReplicationHandleDeadException rhde) { db.close(true); // close no sync. db = null; continue; } catch (DatabaseException e) { System.err.println("Got db exception reading replication" + "DB: " + e.toString()); break; } continue; } if (words.length == 1 && (words[0].compareToIgnoreCase("quit") == 0 || words[0].compareToIgnoreCase("exit") == 0)) { break; } else if (words.length != 2) { System.err.println("Format: TICKER VALUE"); continue; }
Now we assign data to the DatabaseEntry
classes that we will use to write the new information to the database.
DatabaseEntry key = new DatabaseEntry(words[0].getBytes()); DatabaseEntry data = new DatabaseEntry(words[1].getBytes());
Having done that, we can write the new information to the database. Here, the reason we do not need an explicit commit on this put operation is that it uses the implicit NULL txnid, so each one is automatically committed.
db.put(null, key, data);
Finally, we close our database before returning from the method.
} if (db != null) db.close(true); return 0; }
This function is unmodified from when we originally introduced it. For details on that function, see Method: SimpleTxn.printStocks() .