Program Listing

Class: RepConfigInfo
Class: excxx_repquote_gsg_RepMgrWrforGSG
Function: usage()
Function: main()
Method: RepMgrWrforGSG::init()
Method: RepMgrWrforGSG::doloop()
Method: RepMgrWrforGSG::print_stocks()

Our example program builds from the simple transactional application you created in the Transactional Application chapter and configures write forwarding. The application is network-aware, so you can specify things like host names and ports from the command line. This program has additional error handling for replication errors.

When using replication with write forwarding, there are several benefits for your application code:

Class: RepConfigInfo

Before we begin, we present a class that we will use to maintain useful information for us.

The class that we create is called RepConfigInfo .

#include <db_cxx.h>

class RepConfigInfo {
public:
    RepConfigInfo();
    virtual ~RepConfigInfo();

    void addOtherHost(char* host, int port);
    
public:
    u_int32_t start_policy;
    char* home;
    bool got_listen_address;
    REP_HOST_INFO this_host;
    int nrsites;
    // used to store a set of optional other hosts.
    REP_HOST_INFO *other_hosts;
};


RepConfigInfo::RepConfigInfo()
{
    start_policy = DB_REP_ELECTION;
    home = "TESTDIR";
    got_listen_address = false;
    nrsites = 0;
    other_hosts = NULL;
}

RepConfigInfo::~RepConfigInfo()
{
    // release any other_hosts structs.
    if (other_hosts != NULL) {
        REP_HOST_INFO *CurItem = other_hosts;
        while (CurItem->next != NULL) {
            REP_HOST_INFO *TmpItem = CurItem->next;
            free(CurItem);
            CurItem = TmpItem;
        }
        free(CurItem);
    }
    other_hosts = NULL;
}  

Class: excxx_repquote_gsg_RepMgrWrforGSG

Our example will instantiate a class, RepMgrWrforGSG, that performs all our work for us. Before we implement our main() function, we show the RepMgrWrforGSG class declaration.

First, we provide some declarations and definitions that are needed later in our example. One is the size of our cache, which we keep deliberately small for this example, and the other is the name of our database. Also, you can define a sleep time, which sets the time that a site waits before it retries synchronizing with the master. We also provide a global variable that is the name of our program; this is used for error reporting later on.

#include <iostream>
#include <errno.h>
#include <db_cxx.h>
#include "RepWrforConfigInfo.h"


using std::cout;
using std::cin;
using std::cerr;
using std::endl;
using std::flush;

#define CACHESIZE   (10 * 1024 * 1024)
#define DATABASE    "quote.db"
#define SLEEPTIME 3

const char *progname = "excxx_repquote_gsg_wrfor";

#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <direct.h>
#define    sleep(s)        Sleep(1000 * (s))

extern "C" {
  extern int getopt(int, char * const *, const char *);
  extern char *optarg;
}
#endif  

And then we define our RepMgrWrforGSG class:

class RepMgrWrforGSG
{
public:
    // Constructor.
    RepMgrWrforGSG();
    // Initialization method. Creates and opens our environment handle.
    int init(RepConfigInfo* config);
    // The doloop is where all the work is performed.
    int doloop();
    // terminate() provides our shutdown code.
    int terminate();

private:
    // disable copy constructor.
    RepMgrWrforGSG(const RepMgrWrforGSG &);
    void operator = (const RepMgrWrforGSG &);

    // internal data members.
    RepConfigInfo   *app_config;
    DbEnv           dbenv;

    // private methods.
    // print_stocks() is used to display the contents of our database.
    static int print_stocks(Db *dbp);
};  

Note that we show the implementation of the various RepMgrWrforGSG methods later in this section.

Function: usage()

Our usage() function manages the following command line arguments:

static void usage()
{
    cerr << "usage: " << progname << endl
        << "-h home -l|-L host:port [-r host:port]" << endl;

    cerr 
        << "\t -h home directory (required)" << endl
        << "\t -l host:port (required unless -L is specified;"
        << "\t    l stands for local)" << endl
        << "\t -L host:port (optional, L means group creator)" << endl
        << "\t -r host:port (optional; r stands for remote; any "
        << "number of these" << endl
        << "\t    may be specified)" << endl;

    exit(EXIT_FAILURE);
}  

where:

  • -h

    Identifies the environment home directory. You must specify this option.

  • -l

    Identifies the host and port used by this site. You must specify this option unless -L is specified.

  • -L

    Identifies the local site as group creator. You must specify this option unless -l is specified.

  • -r

    Optionally identifies another site participating in this replication group.

Function: main()

Now we provide our main() function. This is a trivial function whose only job is to collect command line information, then instantiate a RepMgrWrforGSG object, run it, then terminate it.

We begin by declaring some useful variables. Of these, note that we instantiate our RepConfigInfo object here. Recall that this is used to store information useful to our code.

int main(int argc, char **argv)
{
    RepConfigInfo config;
    char ch, *last_colon, *portstr, *tmphost;
    int tmpport;
    int ret;  

Then we collect our command line information:

    // Extract the command line parameters
    while ((ch = getopt(argc, argv, "h:l:L:r:")) != EOF) {
        switch (ch) {
        case 'h':
            config.home = optarg;
            break;
        case 'L':
            config.this_host.creator = true; // FALLTHROUGH
        case 'l':
            config.this_host.host = optarg;
            /*
             * The final colon in host:port string is the
             * boundary between the host and the port portions
             * of the string.
             */
            if ((last_colon = strrchr(optarg, ':')) == NULL) {
                cerr << "Bad local host specification." << endl;
                usage();
            }
            /*
             * Separate the host and port portions of the 
             * string for further processing.
             */
            portstr = last_colon + 1;
            *last_colon = '\0';
            config.this_host.port = (unsigned short)atoi(portstr);
            config.got_listen_address = true;
            break;
        case 'r':
            tmphost = optarg;
            /*
             * The final colon in host:port string is the 
             * boundary between the host and the port portions
             * of the string.
             */
            if ((last_colon = strrchr(tmphost, ':')) == NULL) {
                cerr << "Bad remote host specification." << endl;
                usage();
            }
            /*
             * Separate the host and port portions of the 
             * string for further processing.
             */
            portstr = last_colon + 1;
            *last_colon = '\0';
            tmpport = (unsigned short)atoi(portstr);
            config.addOtherHost(tmphost, tmpport);
            break;
        case '?':
        default:
            usage();
        }
    }

    // Error check command line.
    if ((!config.got_listen_address) || config.home == NULL)
        usage();  

Now we instantiate and initialize our RepMgrWrforGSG class, which is what is responsible for doing all our real work. The RepMgrWrforGSG::init() method creates and opens our environment handle.

    RepMgrWrforGSG runner;
    try {
        if((ret = runner.init(&config)) != 0)
            goto err;  

Then we call the RepMgrWrforGSG::doloop() method, which is where the actual transactional work is performed for this application.

        if((ret = runner.doloop()) != 0)
            goto err;  

Finally, catch exceptions and terminate the program. Note, again, that in a traditional transactional application all databases would be closed here. In our replicated application, the database will usually be closed in the doloop() function, but we also conditionally close the database here to handle some error cases.

    } catch (DbException dbe) {
        cerr << "Caught an exception during initialization or"
            << " processing: " << dbe.what() << endl;
    }
err:
    runner.terminate();
    return 0;
}  

Method: RepMgrWrforGSG::init()

The RepMgrWrforGSG::init() method is used to create and open our environment handle.

First, we show the class constructor implementation, which is only used to initialize a few variables:

RepMgrWrforGSG::RepMgrWrforGSG() : app_config(0), dbenv((u_int32_t)0)
{
}  

We now provide the init() method implementation. You can now configure and start Replication Manager with write forwarding. To configure write forwarding, use rep_set_config with the DB_REPMGR_CONF_FORWARD_WRITES option:

int RepMgrWrforGSG::init(RepConfigInfo *config)
{
    int ret = 0;

    app_config = config;

    dbenv.set_errfile(stderr);
    dbenv.set_errpfx(progname);

    DbSite *dbsite;
    dbenv.repmgr_site(app_config->this_host.host,
        app_config->this_host.port, &dbsite, 0);
    dbsite->set_config(DB_LOCAL_SITE, 1);
    if (app_config->this_host.creator)
        dbsite->set_config(DB_GROUP_CREATOR, 1);

    dbsite->close();

    int i = 1;
    for ( REP_HOST_INFO *cur = app_config->other_hosts;
        cur != NULL && i <= app_config->nrsites;
        cur = cur->next, i++) {

        dbenv.repmgr_site(cur->host, cur->port, &dbsite, 0);
        dbsite->set_config(DB_BOOTSTRAP_HELPER, 1);

        dbsite->close();
    }

    // We can now open our environment, although we're not ready to
    // begin replicating.  However, we want to have a dbenv around
    // so that we can send it into any of our message handlers.
    dbenv.set_cachesize(0, CACHESIZE, 0);
    dbenv.set_flags(DB_TXN_NOSYNC, 1);

    try {
        dbenv.open(app_config->home, DB_CREATE | DB_RECOVER |
            DB_THREAD | DB_INIT_REP | DB_INIT_LOCK | DB_INIT_LOG | 
            DB_INIT_MPOOL | DB_INIT_TXN, 0);
    } catch(DbException dbe) {
        cerr << "Caught an exception during DB environment open." << endl
            << "Ensure that the home directory is created prior"
            << "to starting the application." << endl;
        ret = ENOENT;
        goto err;
    }

    /* Configure Replication Manager write forwarding. */
    dbenv.rep_set_config(DB_REPMGR_CONF_FORWARD_WRITES, 1);

    if ((ret = dbenv.repmgr_start(3, app_config->start_policy)) != 0)
        goto err;

err:
    return ret;
}  

Finally, we present the RepMgrWrforGSG::terminate() method here. All this does is close the environment handle.

int RepMgrWrforGSG::terminate()
{
    try {
        dbenv.close(0);
    } catch (DbException dbe) {
        cerr << "error closing environment: " << dbe.what() << endl;
    }
    return 0;
}  

Method: RepMgrWrforGSG::doloop()

Having written our main() function and support utility methods, we now implement our application's primary data processing method. This method provides a command prompt at which the user can enter a stock ticker value and a price for that value. This information is then entered to the database.

To display the database, simply enter return at the prompt.

To begin, we declare a database pointer, several Dbt variables, and the usual assortment of variables used for buffers and return codes. We also initialize all of this.

#define BUFSIZE 1024
int RepMgrWrforGSG::doloop()
{
    Dbt key, data;
    Db *dbp;
    char buf[BUFSIZE], *rbuf;
    int ret;

    dbp = 0;
    memset(&key, 0, sizeof(key));
    memset(&data, 0, sizeof(data));
    ret = 0;  

Next, we begin the loop and we immediately open our database if it has not already been opened.

If -L is set, it specifies the need to create the database for the initial group creator startup. The database will be replicated to the other sites when they first start up. The database will already exist on each site for subsequent startups.

Note that there is some logic for a site to retry in case it needs time to synchronize with the master using SLEEPTIME.

    for (;;) {
        if (dbp == 0) {
            dbp = new Db(&dbenv, 0);

            try {
                dbp->open(NULL, DATABASE, NULL, DB_BTREE,
                    app_config->
                           this_host.creator ? DB_CREATE | DB_AUTO_COMMIT :
                           DB_AUTO_COMMIT, 0);
            } catch(DbException dbe) {
                // It is expected that this condition will 
                // be triggered when client sites start up.  It can 
                // take a while for the master site to be found
                // and synced, and no DB will be available until then.
                if (dbe.get_errno() == ENOENT) {
                    cout << "No stock db 
                    available yet - retrying." << endl;
                    try {
                        dbp->close(0);
                    } catch (DbException dbe2) {
                        cout << "Unexpected error closing after failed" <<
                            " open, message: " << dbe2.what() << endl;
                        dbp = NULL;
                        goto err;
                    }
                    dbp = NULL;
                    sleep(SLEEPTIME);
                    continue;
                } else {
                    dbenv.err(ret, "DB->open");
                    throw dbe;
                }
            }
        } 

Now we implement our command prompt. If the user enters the keywords exit or quit, the loop is exited and the application ends. If the user enters nothing and instead simply presses return, the entire contents of the database is displayed. We use our print_stocks() method to display the database. (That implementation is shown next in this chapter.)

We also now check for a dead replication handle, which can occur in rare cases when a new master causes a previously committed transaction to be rolled back. In such cases, all database handles must be closed and opened again.

Remember that very little error checking is performed on the data entered at this prompt. If the user fails to enter at least one space in the value string, a simple help message is printed and the prompt is returned to the user.

        cout << "QUOTESERVER" ;
        cout << "> " << flush;

        if (fgets(buf, sizeof(buf), stdin) == NULL)
            break;
        if (strtok(&buf[0], " \t\n") == NULL) {
            switch ((ret = print_stocks(dbp))) {
            case 0:
                continue;
            case DB_REP_HANDLE_DEAD:
                (void)dbp->close(DB_NOSYNC);
                cout << "closing db handle due to rep handle dead" << endl;
                dbp = NULL;
                continue;
            default:
                dbp->err(ret, "Error traversing data");
                goto err;
            }
        }
        rbuf = strtok(NULL, " \t\n");
        if (rbuf == NULL || rbuf[0] == '\0') {
            if (strncmp(buf, "exit", 4) == 0 ||
                strncmp(buf, "quit", 4) == 0)
                break;
            dbenv.errx("Format: TICKER VALUE");
            continue;
        }  

Now we assign data to the Dbts that we will use to write the new information to the database.

        key.set_data(buf);
        key.set_size((u_int32_t)strlen(buf));

        data.set_data(rbuf);
        data.set_size((u_int32_t)strlen(rbuf));  

Having done that, we can write the new information to the database. Here, the reason we do not need an explicit commit on this put operation is that it uses the implicit NULL txnid, so each one is automatically committed. Also, the application retries if a deadlock, timeout or permission error occurs. A forwarded put operation can return a timeout error if the operation takes too long and a permission error if there is currently no master.

        if ((ret = dbp->put(NULL, &key, &data, 0)) != 0)
        {
            dbp->err(ret, "DB->put");
            switch (ret) {
            case DB_REP_HANDLE_DEAD:
                /* Must close and reopen the handle, then can retry. */
                (void)dbp->close(0); 
                dbp = NULL;
                /* FALLTHROUGH */
            case DB_LOCK_DEADLOCK:
            case DB_TIMEOUT:
            case EACCES:
                dbenv.errx("Could not update data, retry operation");
            case DB_KEYEXIST:
                continue;
            default:
                dbp->err(ret, "Error updating data");
                goto err;
            }
        }
    }  

Finally, we close our database before returning from the method.

err:    if (dbp != NULL) {
        (void)dbp->close(DB_NOSYNC);
        cout << "database closed" << endl;
        }

    return (ret);
}  

Method: RepMgrWrforGSG::print_stocks()

This function is unmodified from when we originally introduced it. For details on that function, see Method: SimpleTxn::print_stocks() .