Forum: Building VoltDB Applications

Post: Aggregates on each partition

Aggregates on each partition
tamil
Sep 28, 2011
Hi experts,

Just to follow some hints that Mr. Ryan Betts shared a while back (thanks again!) in response to my question...

Since then my first VoltDB application has grown into an alpha testing phase, with three client classes keep running all days... one feeds network traffic flow sample data in real time into the DB, another one's to periodically delete stale records, and the third one requests aggregations in so as to obtain estimated traffic volume by each destination IP address for the past 10 seconds, then to further analyze what's happening on high volume destinations.

So the whole operations sort of break down into destination IP addresses, that's why I first picked dst_ip column (type bigint) as a partition key. But when I started working on the third client class, the first thing to do is something like:

select dst_ip, sum(sampledpacketsize * meanskipcount) as byte
from flow
where unixmillis between ? and ?
and ...
group by dst_ip
order by byte desc;


With this query things won't run in a single partition, so I went back to the blog page and read what I was told:
We don't provide an "official" way for a client to round-robin procedure calls across all partitions - but it in practice it is easy to do..

Then another thought occured to me that why not including the value of _dst_ip_ modulo _hostcount*siteperhost_ when inserting each row and using this column for partitioning instead of dst_ip?
Right now I have 1*4 partitions, so this column (partition_id) is set to either 0, 1, 2, or 3.

select ...
from flow
where partition_id = ?
and unixmillis between ? and ?
and ...
group by dst_ip
order by byte desc;


Then from here I should include both partition_id and dst_ip in every where clause, and also if aggregations and further analysis need to be done in every second, procedures should be invoked in every 1/4 second. This way, it seems like it has evolved into a functioning single-partition procedure, and still achieve what I wanted to do.

Now my question:


  1. Do I understand correctly?
  2. Even if so, is there any way to do it in a less awakward manner?
  3. If this is the right way to do it, I'd appreciate it if my programs, both server-side and client-side, could get access to the total number of partitions defined in project.xml file rather than to having static final variables on each side?Please excuse me if I'm missing something obvious, I do have some experience in SQL and RDBMS but am in no way familiar with writing in Java, let alone VoltDB, however they have been growing on me big now because the whole coding process was real fun to me! -Tami
callProcedure("@statistics", "PARTITIONCOUNT", 0)
tamil
Sep 30, 2011
Sorry to reply to my own question, but just found this in the online documentation... my bad!

Also, the rsults of another system procedure with "PROCEDURE" component has proved that my client programs keep running on each partition in round-robin fashion.

Now I just wonder how I could pinpoint each procedure call, if posible, to the partition with this internal partition_id rather than allocating a partition id to each record privately?
Procedure partition routing
rbetts
Sep 30, 2011
Sorry to reply to my own question, but just found this in the online documentation... my bad!

Also, the rsults of another system procedure with "PROCEDURE" component has proved that my client programs keep running on each partition in round-robin fashion.

Now I just wonder how I could pinpoint each procedure call, if posible, to the partition with this internal partition_id rather than allocating a partition id to each record privately?


Sorry for the delay in answering your question. Happy to hear you're making progress and enjoying VoltDB.


At the moment, VoltDB uses a simple modulo to map integer parameter types to partitions. You can run a single partition procedure once at each partition by calling it multiple times with the partition parameter values (0...partitionCount-1).


I think this approach is better than encoding the partition count into your data - I would advise against that design. Depending on how much data you need to return for your third procedure, you might also find a multi-partition procedure works well. VoltDB will push down aggregates and limits to each partition. See: http://blog.voltdb.com/optimizing-distributed-read-operations-voltdb/


We'd love to hear about your use case - if you'd like to chat about what you're doing or what Volt could do to make your problem easier - contact Diane (dbranscomb@voltdb.com) and she'll set up a phone call.
Thank you Ryan for sharing
tamil
Sep 30, 2011
Sorry for the delay in answering your question. Happy to hear you're making progress and enjoying VoltDB.
...
We'd love to hear about your use case - if you'd like to chat about what you're doing or what Volt could do to make your problem easier - contact Diane (dbranscomb@voltdb.com) and she'll set up a phone call.


Thank you Ryan for sharing precious info once again!

At the moment, VoltDB uses a simple modulo to map integer parameter types to partitions.

So.. let me see if I understand it correctly..
Suppose there're four unique partitions in the database cluster, and the table in question is partitioned by dstip column.
Now the VoltDB client receives four pieces of data coming from traffic sample exporters (backbone routers).
In the dstip field traffic samples have following values: 16843009, 16843010, 16843011, and 16843012.

In this example, if the client requests insert procedures for these 4 pieces to the server, they're guarateed to spread into four partitions, aren't they?
Without knowing that mapping method, I just had to run a modulo calculation myself before inserting each record, and make use of that value as psudo partition id.
If you kindly confirm I'm not mistaken here, then I'll delete that psudo partition id column, and for aggregation I'd go like:

where dstip % 4 = ?

Depending on how much data you need to return for your third procedure, you might also find a multi-partition procedure works well.
What I really need should be a fairly small set of data, but I'm not allowed to "limit" them... it'd be different if the "having" aggregation condition was supported:

...
having sum(sampledpacketsize*meanskipcount) > 131072000
...

Besides, I mean to have it really scale by design, and have no problem calling a procedure in every 1/4 second:)
Tell you the truth, I'm also having the delete procedure run for each single partition in the same way.
Also, I even plan to have another whole set of the same data in a different cluster with srcip being a partition key for outbound DoS attack detection.

And yes, I think with VoltDB I'm doing what used to be really undoable... Thanks to you guys, we no longer have to pay eight-digit JPY to purchase something like this!
Hi Ryan, I wasn't still
tamil
Oct 2, 2011
Thank you Ryan for sharing precious info once again!

At the moment, VoltDB uses a simple modulo to map integer parameter types to partitions.

And yes, I think with VoltDB I'm doing what used to be really undoable... Thanks to you guys, we no longer have to pay eight-digit JPY to purchase something like this!


Hi Ryan, I wasn't still getting the point at the time of writing my last post, as you might be aware:)
While making changes to my code, I've finally realized how to do it!
On the client side:

private void doAggregate(short partitionParam){
VoltTable results = client.callProcedure(AggregateFlow.class.getSimpleName(), partitionParam, some, other, args).getResults();
...
}
public void start() {
char i = 0;
short partitionParam;
while (true) {
try {
Thread.sleep((long)(runFrequencyMillis/partitionCount));
} catch (InterruptedException e) {
e.printStackTrace();
}
// the value being either 0, 1, 2, or 3
partitionParam = (short)((int)i % partitionCount);
doAggregate(partitionParam);
i++;
}
}


On the server side:

@ProcInfo(
partitionInfo = "FLOW_V.dstIp: 0",
singlePartition = true
)
public class AggregateFlow extends VoltProcedure {
/* partitionParam is *not* used as a SQL parameter!
no need for condition statements with modulo (%) expression.
which doesn't seem to be supported anyway ('unknown token')...
*/

public final SQLStmt aggregateFlow = new SQLStmt("SELECT ...");
/* instead it's only passed to run() method as the first arg! */
public VoltTable[] run( long partitionParam, some, other, args ) throws VoltAbortException {
...
}
...
}


Shoot me if I'm still out of the loop...