Forum: Building VoltDB Applications

Post: Building a queuing system

Building a queuing system
francis
Dec 18, 2012
I am in the process of building (rebuilding) our queuing system. In MySQL this was painfully simple and it worked great. But now we are finding that because of limitations in the queries in VoltDB, we are having content being processed twice (or more) depending on how many servers we have processing the queue.

In MySQL, it was pretty simple, we enabled a feature that returned updated rows and ran a query like this:

UPDATE queue SET status=1 WHERE status=0 AND added<='$now' ORDER BY added LIMIT 500;


Then as we processed each recorded we updated the status to 2 (which meant done) and then would get purged at the end of the day.

However, now we are finding that we are doing a rather hacked version of this as I mentioned above and hitting records multiple times in some cases. We have two solutions for this, one is in PHP where we get the dataset and then we update the status to 1 for queueID between the first and last (which no longer works with an upcoming update). We had it as its own procedure that would hit the DB, get the list of records and then look through them and set the status to 1.

In both of our scenarios, records are being processed twice. The procedure has far fewer, but it still has some. Of course, if we shrink the queue from 500 to 100 it works. But we want to grow it to 2500 in the next update, in which case we will get a ton of duplicates.

Anyone have any ideas that would make this work. I've been bashing my head on this for days now, I really cannot figure it out. It would be great if VoltDB supported ORDER BY in updates, it would alleviate a huge portion of my pains, but I really cannot think of a solution.
francis
Dec 20, 2012
Guess I'm not the only one that is stumped...
pmartel
Dec 21, 2012
I think the most straight-forward substitute for:

UPDATE queue SET status=1 WHERE status=0 AND added<='$now' ORDER BY added LIMIT 500;

would be a stored procedure that used the two statements:

SELECT added [, unique_id_if_different_from_added] FROM queue WHERE status=0 AND added<=? ORDER BY added [, unique_id_if_different_from_added] LIMIT 500;

UPDATE queue SET status=1 WHERE status=0 AND added_or_id_if_different_from_added = ?;

the latter called in a loop for each row returned from the first statement.

There are performance/reliability/consistency trade-offs to make here in that the UPDATE would have to be a (relatively
expensive) multi-partition write UNLESS you forced all its rows to a single partition by use of a constant partition key
-- but that doesn't scale so well if queues are very large.

An approach that would allow the queue to scale across partitions but still avoid multi-partition UPDATEs would be
to partition the queue on a column representing a slower counter -- a scaled-down "added" value.
This would cause batches of rows to accumulate on a single partition, where they could be processed by the following:

Transaction 1 (multi-partition read-only procedure):

SELECT min(added) min_added, counter FROM queue GROUP BY counter, added WHERE status=0 and added<=? ORDER BY min_added LIMIT 1;

Aside: the performance of this statement might benefit from use of a materialized view.

Transaction 2 (single-partition read/write procedure):

SELECT added [, unique_id_if_different_from_added] FROM queue WHERE counter = ? and status=0 AND added<=? ORDER BY added [, unique_id_if_different_from_added] LIMIT 500;

UPDATE queue SET status=1 WHERE counter = ? and status=0 AND added_or_id_if_different_from_added = ?;

the latter called in a loop for each row returned from the first statement,
all conditional on the first row matching the min_added from the first transaction
to ensure that this server has not been "scooped".

You didn't specify what kinds of database operations may be required to get an entry into the "DONE" state.
If it requires access to multi-partition data or even single-partitioned data that is partitioned differently from
your queue, you will have the same kinds of trade-offs to make there between transactional consistency/reliability
and performance, depending on whether your queue update must share a transaction with the other operations.
francis
Dec 21, 2012
Had a procedure similar to that, but when the limit was grown to about 1000 because we had multiple servers processing the queue, a few of them would be calling for a list of available queue rows to process them.

The steps for the queue are pretty simple:
1. Get queue elements that need to be processed
2. Ensure that the elements returned in 1 are marked as in process (so that no one else picks them up)
3. Process each queue element and upon completion mark them as done or reset the status to 0 to reprocess at a later date (max 3 tries)

Under heavier loads we noticed that when the 4 servers were all calling the queue to process it, some of them would get the same rows because they haven't been marked as in progress fast enough.

This is not a user facing process, so we can have more control over how this performs and how often it is called. But having 2500 or even 10000 per minute, per server is pretty crucial. The queue for us is used to send out emails, to process reports, to clean data, etc. When we have the queue filled to 1m or so queue elements the need to execute now, we increase the number of processes each server has per minute to run the queue.

The single partition is definitely what I would prefer to go for, but while looping through all the results has some latency and as the queue size grows and more servers are added, more times will that queue element run. I'd love to have a solution for this in VoltDB. But it sounds like its much more an impossibility to ensure that each server has a unique list of queue elements to process. The only solution I would have is to check every single element as I process it and ensure it wasn't already executed, which would add a significant amount of latency.
awilson
Dec 21, 2012
Hi Francis,

I'm looking at the thread and I'd like to know a little bit more about the architecture. It sounds like you have a mail queue and a couple of apps polling it. The apps are loading duplicate messages and are stepping on each other. Does that sounds about right?

If so, I may have a solution for you. I had to ingest a lot of data and dispatch it to hadoop and Ariel and I came up with a really fast method for handling parallel ingestion and parallel extraction with redundancy. I had most of the app wrapped up but had to move on to something more pressing and consequently, did not publish it.

The basic idea is that you distribute the messages randomly across all the partitions of the cluster. You read a block of messages and mark them as "in progress" with a time out. You then dispatch the block of emails, tracking any message that fails (you lose the smtp connection for example) and remove the "in progress" flag and set it to something like "failed". After successfully dispatching a block of messages, you can either use another stored procedure to delete the messages or you can mark them as successfully processed and purge them at a later time.

Here's the code to get the cluster partition count
private int getMaxPartitions() throws NoConnectionsException,            UnknownHostException, IOException, ProcCallException {
        int results = 0;


        ClientResponse response = this.getClient().callProcedure("@Statistics",
                "partitioncount", 0);
        if (response.getStatus() == ClientResponse.SUCCESS) {
            VoltTable[] resultTables = response.getResults();
            if (resultTables[0].advanceRow()) {
                results = (int) resultTables[0].getLong(0);
            }
        }
        return results;
    }


This code will distribute the messages across your partitions

    private void insert() throws NoConnectionsException, IOException,
            ProcCallException, InterruptedException {
        maxTransactionsSemaphore.acquire();


        String testData = getTestData();
        this.client.callProcedure(new ProcedureCallback() {
            public void clientCallback(ClientResponse response)
                    throws Exception {


                if (response.getStatus() != ClientResponse.SUCCESS) {
                    System.out.println("Failed to insert new row, exiting. "
                            + response.getStatusString());
                    System.exit(0); // we could exit more aggressively for this app
                }


            }
        }, "Insert", this.random.nextInt(this.partitions), testData);
    }


The read operation looks like this

    private void selectBatch() throws NoConnectionsException, IOException,
            ProcCallException, InterruptedException {
        // throttle your select operations so you don't overflow the system
        // with unnecessary calls.
        maxTransactionSemaphore.acquire(); 
        this.client.callProcedure(new ProcedureCallback() {


            public void clientCallback(ClientResponse response)
                    throws Exception {
                transactionCounter++;


                if (response.getStatus() == ClientResponse.SUCCESS) {
                    pendingBatches--;
                    new Thread(new ProcessResults(response)).run();


                } else {


                    System.out.println("Failed to read rows, exiting."
                            + response.getStatusString());
                    System.exit(0);
                }
                maxTransactionSemaphore.release();
            }


        }, "SelectBatch", this.partition, this.batchSize);
        System.out.println("Calling partition: " + this.partition
                + " with batch size of " + this.batchSize);
    }


The stored proc:

@ProcInfo(singlePartition = true, partitionInfo = "logtable.partition_id:0")
public class SelectBatch extends VoltProcedure {


    private final static SQLStmt SELECT_STMT = new SQLStmt(
            "SELECT * FROM logtable WHERE partition_id=? and read=0 order by row_id ASC, partition_id, read LIMIT ?");


    private final static SQLStmt UPDATE_STMT = new SQLStmt(
            "Update logtable set read=1, readtime=? WHERE partition_id=? and row_id >=? and row_id <=?");


    public VoltTable[] run(int partition, int batchSize) {


        voltQueueSQL(SELECT_STMT, partition, batchSize);
        VoltTable[] results = voltExecuteSQL();
        if (results[0].getRowCount() > 0) {
            long startRow = 0;
            long endRow = 0;
            while (results[0].advanceRow()) {


                long rowID = results[0].getLong("row_id");
                if (startRow == 0) {
                    startRow = rowID;
                }
                endRow = rowID;
            }


            voltQueueSQL(UPDATE_STMT, this.getTransactionTime() ,partition, startRow, endRow);
            voltExecuteSQL(true);
            results[0].resetRowPosition();
        }
        return results;
    }
}


The deletion/successfully sent code:

@ProcInfo(singlePartition = true, partitionInfo = "logtable.partition_id:1")
public class MarkRowsProcessed extends VoltProcedure {


    private final static SQLStmt MARK_ROW_PROCESSED_STMT = new SQLStmt(
            "UPDATE logtable " + "set processed=1 where partition_id=? and row_id=?;");
    
    private final static SQLStmt DELETE_ROW_PROCESSED_STMT = new SQLStmt(
            "DELETE from logtable " + " where partition_id=? and row_id=?;");


    public long run(long[] rowsId, int partition) {
        for ( long rowId: rowsId ) {
            voltQueueSQL(DELETE_ROW_PROCESSED_STMT, partition, rowId);
        }
        voltExecuteSQL(true);
        return 0;
    }
}


The DDL:

CREATE TABLE logtable
(
    row_id         bigint           NOT NULL,
    partition_id   integer          NOT NULL,
    data           varchar(2048)    NOT NULL,
    read           tinyint          NOT NULL,
    processed      tinyint          NOT NULL,
    readTime       timestamp,
    CONSTRAINT PK_LOG PRIMARY KEY
      (
        partition_id, row_id
      )
);


CREATE VIEW rowcountview AS SELECT partition_id, read, count(*) AS rowcount FROM logtable GROUP BY partition_id, read;


CREATE INDEX IDX_NAME_READ_UNIQUE ON logtable (row_id, partition_id, read );


Finally, my "reader" app would create one thread per partition to read the data and dispatch the data. The effect is that I created a 2 phase commit application. You only need a recovery process to detect that a message has been sitting around longer than your default TTL. Then clear the readTime and processed flag and you app should automatically reload it and send it on its way.

Hope that helps.
francis
Dec 21, 2012
It's not just a mail queue, but the idea is similar.

The schema

CREATE TABLE data_queue (
  queueID BIGINT NOT NULL,
  method VARCHAR(25),
  customVars VARCHAR(1024), 
  runTime TIMESTAMP DEFAULT NULL,
  status TINYINT DEFAULT '0' NOT NULL,
  CONSTRAINT IDX_queue_PK PRIMARY KEY (queueID)
);
CREATE INDEX IDX_queue_runTimeStatus ON data_queue (runTime,status); 


queueID: self explanitory
method: what are we processing?
customVars: custom variables required to process the element
runtime: when the queue element will be processed (this used to be called added)
status: 0 = not started, 1 = in progress, 2 = done (we remove status = 2 every 24 hours)

This queue is currently used to:
- Send emails
- Process reports
- Execute scheduled content
- Etc

What I have is something very similar to what you have provided as example code, however we don't have the partition_id, we just have VoltDB partition it by queueID, but I see how partition_id would solve issues around MP queries.

The other thing I noticed, that seems to be the only thing I am scared of, but can ultimately solve the problem as you have shown it is the reading, getting the first and last ID. Some of these queue elements can be reused and we have the ability to repeat some of these elements based on their execution. In order to ensure that we never run into any odd issues (which we did), should we simply just mark it as done and create a new one? We also have some queue elements that are set to run in the future (ex in 24 hours from now). Just wondering if there are any other edge cases where the queueID may no longer remain sequential and that update would break?



Happy Holidays Andy, Ariel and the rest of you folks!
awilson
Dec 21, 2012

The other thing I noticed, that seems to be the only thing I am scared of, but can ultimately solve the problem as you have shown it is the reading, getting the first and last ID. Some of these queue elements can be reused and we have the ability to repeat some of these elements based on their execution. In order to ensure that we never run into any odd issues (which we did), should we simply just mark it as done and create a new one? We also have some queue elements that are set to run in the future (ex in 24 hours from now). Just wondering if there are any other edge cases where the queueID may no longer remain sequential and that update would break?

I'm not keen on marking queue objects as done until they have been successfully acted upon. Your dispatcher app could fault, server go down, email die, etc. and the record may not get dispatched and then the true status of the object is lost. Remember that as long as you are acting on the queue records in a single partition then the batch being marked is secure from all other operations against that partition.

I order on the row_id and processed columns to ensure that the batch is marked properly. I would not order on the queueID. I would partition on it and add a rowID. Then create a thread for each queueID or allow a single thread to act on a few queues in separate queries and order in a similar way that I did. Then you are pretty much shielded from any strange ordering issues and it will run fast and allow you to run several instances of the dispatching app to run in parallel.

Happy Holidays Andy, Ariel and the rest of you folks!

Happy holidays to you too!

I'll probably get in touch with you near the beginning of the new year just to see how things are going.
francis
Jan 2, 2013
I'm not keen on marking queue objects as done until they have been successfully acted upon. Your dispatcher app could fault, server go down, email die, etc. and the record may not get dispatched and then the true status of the object is lost. Remember that as long as you are acting on the queue records in a single partition then the batch being marked is secure from all other operations against that partition.

I order on the row_id and processed columns to ensure that the batch is marked properly. I would not order on the queueID. I would partition on it and add a rowID. Then create a thread for each queueID or allow a single thread to act on a few queues in separate queries and order in a similar way that I did. Then you are pretty much shielded from any strange ordering issues and it will run fast and allow you to run several instances of the dispatching app to run in parallel.


The reason I am concerned with the update statement in your procedure is because of the following (it is a bit difficult to explain).

We add the following items to the queue:
- ID: 1 == Email (Send immediately)
- ID: 2 == Email (Send in 1 hour)
- ID: 3 == Email (Send immediately)
- ID: 4 == Report (Run every 1 minute for 1 hour)
- ID: 5 == Email (Send immediately)

As you can see, when we try to run the queue, item 2 and 4 would break given the way that was described above. While the recurring one we can fix by deleting the row and inserting a new one. But the one that has a delay on it, would be marked as in progress automatically. This is why I need to figure out a solution and its pretty complicated.
awilson
Jan 2, 2013
Items 2 and 4 do make it more interesting. You could alter the partition threads to select on ID's 1,3 and 5.

Then create one thread for ID 2. That can iterate through all the partitions depending on the number of items with that ID. It may be necessary to split it up into multiple threads.

ID 4 is a bit more tricky, especially if the report is running a series of multi-partition queries, in which case it can be just a single thread or process. ID 4 could cause a bit too much overhead on the system as a whole depending on the queries. For example, if a report takes a minute to run through all the multi-partition queries then you could have some issues with other parts of the app. I've seen something like this before on a traditional DB. You may want to take a different approach with heavy reports on an active volt cluster or any other DB, like copying the data to a separate cluster and running the reports there, then returning the aggregated data. I've seen this done on Oracle doing financial reporting against a live ecomm site. It worked very well.
francis
Jan 2, 2013
Items 2 and 4 do make it more interesting. You could alter the partition threads to select on ID's 1,3 and 5.

Then create one thread for ID 2. That can iterate through all the partitions depending on the number of items with that ID. It may be necessary to split it up into multiple threads.

ID 4 is a bit more tricky, especially if the report is running a series of multi-partition queries, in which case it can be just a single thread or process. ID 4 could cause a bit too much overhead on the system as a whole depending on the queries. For example, if a report takes a minute to run through all the multi-partition queries then you could have some issues with other parts of the app. I've seen something like this before on a traditional DB. You may want to take a different approach with heavy reports on an active volt cluster or any other DB, like copying the data to a separate cluster and running the reports there, then returning the aggregated data. I've seen this done on Oracle doing financial reporting against a live ecomm site. It worked very well.


I'm not too concerned about that part. We don't execute the actual processing of each queue element there. We merely need a list of the queue elements that we need to execute and mark them as in progress and our code that processes it will mark that element as completed. Since we are looking to grow to several TB of data for our reports, it likely won't be able to stay in VoltDB too much longer and the way we've built that will allow us to switch rapidly to the new solution.

The example I was giving was an example of one particular partition. Since it would be totally random when adding queue elements. I only gave examples for email and reports. But the queue system is customizable by any customers using it and currently plugs into over 15 of our internal plugins. The biggest issues are the ones that are recurring and the ones that have a delay, since when I do "UPDATE queue SET status = 1 WHERE queueID >= 1 AND queueID <= 5" it would update ID 2 that is meant to run in 1 hour. And then running it again in the future, ID 5 will run again because ID 4 is running 60 times and its status is being reset each time until the hour is over.

VoltDB does return the number of rows (tuples) that get updated with a query, if only it could return those rows, all problems would be solved! I'd just do "UPDATE queue SET status = 1 WHERE runtime <= now() AND status = 0 LIMIT 5000" and voila, all problems solved. Though I know as well that the limit would also not work in that query, which is unfortunate.