A list is simply a list of things. The list has no structure, except in some cases, the length of the list may be known. The list may contain duplicate items. In the following example the number 1 is included twice.
Example list:
A set is similar to a list, but has the following differences:
Converting from lists to weighted lists (sets) may logically compress the data. Consider the following:
You will need to post-process the data to create a list from a weighted list.
Example list:
1 2 3 1( Read more... )
A set is similar to a list, but has the following differences:
- The size of the set is always known
- A set may not contain duplicates
1,2 2,1 3,1Notice that there are two number 1 values in the weighted list. In order to make insertions into such a list scalable, consider using partitioning to avoid large indexes.
Converting from lists to weighted lists (sets) may logically compress the data. Consider the following:
1, 10000000000000 2, 10 3, 1000 4, 100000Consider the size of the list that would be produced if this list was not weighted by count. There are very few compression methods which allow the data to be operated on without compression. Aggregate SQL queries can use the weighted list easily without decompression.
You will need to post-process the data to create a list from a weighted list.
I am happy to announce the GA release of Flexviews. I've numbered the release 1.7.0 GA.
There are a small number of bug fixes and enhancements:
FlexCDC got a number of fixes and improvements:
You can get it here: http://flexvie.ws
There are a small number of bug fixes and enhancements:
- Fixes for views which have aggregate functions but no GROUP BY expressions could have had problems during incremental update.
- GA support for all aggregate functions except GROUP_CONCAT and AVG(distinct).
- The PERCENTILE_XX (ie PERCENTILE_90 for a 90th percentile calculation) is now stable too
FlexCDC got a number of fixes and improvements:
- TIMESTAMP columns are now properly supported
- Now mysqlbinlog errors are detected, and the program exits gracefully
- FlexCDC now logs to a log file instead of writing to STDOUT/STDERR
- There are new PHP scripts for adding and removing table changelogs
- Also, a new wrapper script (consumer_safe.sh)
You can get it here: http://flexvie.ws
The HandlerSocket plugin for MySQL currently lacks atomic operations . It is impossible to implement counters (increment/decrement value) or REPLACE functionality with the current implementation.
It currently exceeds the performance of Memcache for get/set operations, but I want to see how fast it is once atomic operations are implemented. Until then, I don't think it is a serious contender for replacing Memcache for the cache layer in a complex environment.
It currently exceeds the performance of Memcache for get/set operations, but I want to see how fast it is once atomic operations are implemented. Until then, I don't think it is a serious contender for replacing Memcache for the cache layer in a complex environment.
A lot of the work that I do on my personal projects such as Flexviews and Shard-Query involves taking a query and manipulating it in such a way as to obtain the same result, but in a different manner. Flexviews amortizes aggregation over time, making it possible to maintain summary tables incrementally. Shard-Query (http://code.google.com/p/shard-query) spreads queries over multiple Gearman workers, allowing it to run queries on one or more MySQL servers in parallel.
Both of these tools rely on a similar set of substitution rules. One of my favorite parts of various math classes was learning how the different mathematical expressions could be substituted for one another, allowing you to solve what otherwise appeared to be impossible problems. Using simple substitutions, SQL can be made to solve difficult problems too, such as running queries in parallel or on multiple shards (or both).
I've tried to collect a some of the rules that I use in my tools. Most of these rules aren't very useful for your regular SQL applications, they are meant to be applied when you build a parallel query tool like Shard-Query.
Rule #1 - Algebraic rules apply to SQL expressions
This rule is important for query optimization. The MySQL optimizer isn't very smart about optimizing expressions so that they can use indexes.
For example: Can be changed to:
Okay, that was a general rule of thumb, but pretty much everything else is related to parallel querying.
Rule #2 - An IN LIST is equivalent to a UNION ALL (as long as the are no duplicate items in the IN list)
For example:
Rule #3 - COUNT(*) = SUM(1)For example:
Rule #3 is only useful in a very limited context, like Flexviews.
Rule #4 - AVG(expr) = SUM(expr)/COUNT(expr)
For example:
Of course, you knew that AVG = SUM/COUNT, but it is important to remember that this decomposition can be made to split up the work.
Rule #5 - In some cases, BETWEEN can be expressed as an IN LIST (for DATE and INTEGER type columns) This conversion to IN makes range lookups on a hash partitioned column possible, as long as the column is INTEGER or DATE.
For example:
Rule #6 - It is possible to run each part of a UNION ALL in parallel if you use read-committed transaction isolation level and the query uses no aggregation
The example given in rule #2 is perfect for this. Each portion of the UNION ALL is independent and can be run in parallel without any further modification.
Rule #7 - It is possible to rewrite aggregate queries for parallel work for MIN/MAX/AVG/SUM/COUNT, but you need the additional following rewrite rules to "wrap" the UNION ALL:
COUNT(expr) := SUM(`COUNT(expr)`)
AVG(expr) := SUM(`SUM(expr)`) / SUM(`COUNT(expr)`) # see below and rule #4
For example: Can be changed to:
SELECT SUM(the_union.expr_s) / SUM(the_union.expr_c) as expr
FROM (
SELECT SUM(some_col) as expr_s, COUNT(some_col) as expr_c FROM t WHERE col2 = 1
UNION ALL
SELECT SUM(some_col) as expr_s, COUNT(some_col) as expr_c FROM t WHERE col2 = 2
UNION ALL
SELECT SUM(some_col) as expr_s, COUNT(some_col) as expr_c FROM t WHERE col2 = 3
) as the_union
Rule #8 - It is possible to rewrite aggregate queries for parallel work for STDDEV,VARIANCE,ETC, but you need the additional following rewrite rules to defer the aggregation until the final step.
STDDEV(expr) := SELECT ... ,expr, ... GROUP BY expr
For example:
SELECT STDDEV(the_union.expr) as expr
FROM (
SELECT some_col as expr FROM t WHERE col2 = 1 GROUP BY some_col
UNION ALL
SELECT some_col as expr FROM t WHERE col2 = 2 GROUP BY some_col
UNION ALL
SELECT some_col as expr FROM t WHERE col2 = 3 GROUP BY some_col
) as the_union
Both of these tools rely on a similar set of substitution rules. One of my favorite parts of various math classes was learning how the different mathematical expressions could be substituted for one another, allowing you to solve what otherwise appeared to be impossible problems. Using simple substitutions, SQL can be made to solve difficult problems too, such as running queries in parallel or on multiple shards (or both).
I've tried to collect a some of the rules that I use in my tools. Most of these rules aren't very useful for your regular SQL applications, they are meant to be applied when you build a parallel query tool like Shard-Query.
Rule #1 - Algebraic rules apply to SQL expressions
This rule is important for query optimization. The MySQL optimizer isn't very smart about optimizing expressions so that they can use indexes.
For example:
SELECT * FROM t WHERE some_col + interval 10 minute >= now()
SELECT * FROM t WHERE some_col >= now() - interval 10 minute;
Okay, that was a general rule of thumb, but pretty much everything else is related to parallel querying.
Rule #2 - An IN LIST is equivalent to a UNION ALL (as long as the are no duplicate items in the IN list)
For example:
SELECT * FROM t WHERE some_col IN (1,2,3)
Can be changed to:
SELECT * FROM t WHERE some_col = 1
UNION ALL
SELECT * FROM t WHERE some_col = 2
UNION ALL
SELECT * FROM t WHERE some_col = 3
UNION ALL
SELECT * FROM t WHERE some_col = 2
UNION ALL
SELECT * FROM t WHERE some_col = 3
Rule #3 - COUNT(*) = SUM(1)
SELECT COUNT(*) FROM t
Can be changed to:
SELECT SUM(1) as `COUNT(*)` FROM t
Rule #3 is only useful in a very limited context, like Flexviews.
Rule #4 - AVG(expr) = SUM(expr)/COUNT(expr)
For example:
SELECT AVG(some_col) FROM t
Can be changed to:
SELECT SUM(some_col)/COUNT(some_col) as `AVG(some_col)` FROM t
Of course, you knew that AVG = SUM/COUNT, but it is important to remember that this decomposition can be made to split up the work.
Rule #5 - In some cases, BETWEEN can be expressed as an IN LIST (for DATE and INTEGER type columns) This conversion to IN makes range lookups on a hash partitioned column possible, as long as the column is INTEGER or DATE.
For example:
SELECT * FROM t where some_col between 1 and 3
Can be changed to:
SELECT * FROM t where some_col IN (1,2,3)
Note that once it is converted to an IN list, then we might be able to further convert the IN to UNION ALL (see rule 2 and #6)
Rule #6 - It is possible to run each part of a UNION ALL in parallel if you use read-committed transaction isolation level and the query uses no aggregation
The example given in rule #2 is perfect for this. Each portion of the UNION ALL is independent and can be run in parallel without any further modification.
Rule #7 - It is possible to rewrite aggregate queries for parallel work for MIN/MAX/AVG/SUM/COUNT, but you need the additional following rewrite rules to "wrap" the UNION ALL:
COUNT(expr) := SUM(`COUNT(expr)`)
AVG(expr) := SUM(`SUM(expr)`) / SUM(`COUNT(expr)`) # see below and rule #4
For example:
SELECT AVG(some_col) as expr FROM t WHERE col2 in (1,2,3)
SELECT SUM(the_union.expr_s) / SUM(the_union.expr_c) as expr
FROM (
SELECT SUM(some_col) as expr_s, COUNT(some_col) as expr_c FROM t WHERE col2 = 1
UNION ALL
SELECT SUM(some_col) as expr_s, COUNT(some_col) as expr_c FROM t WHERE col2 = 2
UNION ALL
SELECT SUM(some_col) as expr_s, COUNT(some_col) as expr_c FROM t WHERE col2 = 3
) as the_union
Rule #8 - It is possible to rewrite aggregate queries for parallel work for STDDEV,VARIANCE,ETC, but you need the additional following rewrite rules to defer the aggregation until the final step.
STDDEV(expr) := SELECT ... ,expr, ... GROUP BY expr
For example:
SELECT STDDEV(some_col) as expr FROM t WHERE col2 in (1,2,3)
Can be changed to:SELECT STDDEV(the_union.expr) as expr
FROM (
SELECT some_col as expr FROM t WHERE col2 = 1 GROUP BY some_col
UNION ALL
SELECT some_col as expr FROM t WHERE col2 = 2 GROUP BY some_col
UNION ALL
SELECT some_col as expr FROM t WHERE col2 = 3 GROUP BY some_col
) as the_union
MySQL replication uses the term 'master' and 'slave' to refer to the machine which is be replicated from, and the machine that is being replicated to, respectively. These terms are highly offensive to some people, given the history of slavery. While the terms master/slave make sense for replication, given their offensive nature, perhaps MySQL should consider changing the terminology.
I suggest changing:
SHOW MASTER LOGS -> SHOW REPLICATOR LOGS;
SHOW MASTER STATUS -> SHOW REPLICATOR STATUS;
SHOW SLAVE STATUS -> SHOW REPLICATION STATUS;
START|STOP SLAVE [IO_THREAD|APPLY_THREAD] -> START|STOP REPLICATION [IO_THREAD|APPLY_THREAD]
The old commands can still work for backwards compatibility, but they should be deprecated.
I suggest changing:
SHOW MASTER LOGS -> SHOW REPLICATOR LOGS;
SHOW MASTER STATUS -> SHOW REPLICATOR STATUS;
SHOW SLAVE STATUS -> SHOW REPLICATION STATUS;
START|STOP SLAVE [IO_THREAD|APPLY_THREAD] -> START|STOP REPLICATION [IO_THREAD|APPLY_THREAD]
The old commands can still work for backwards compatibility, but they should be deprecated.
Available immediately Flexviews 1.6.0-RC2.
This release is a bugfix release. This is the second Flexviews release candidate. If no major bugs are discovered the next release will be the GA release.
Flexviews is a stored procedure managed materialized view system for MySQL 5.1 or greater.
What is fixed in Flexviews 1.6.0-RC2?
Please note that I am not currently making separate FlexCDC releases. If you are only interested in FlexCDC, you will find it in the consumer/ subdirectory of the Flexviews tarball. I will package FlexCDC up as a standalone release along with the GA release of Flexviews.
This release is a bugfix release. This is the second Flexviews release candidate. If no major bugs are discovered the next release will be the GA release.
Flexviews is a stored procedure managed materialized view system for MySQL 5.1 or greater.
What is fixed in Flexviews 1.6.0-RC2?
- Numerous performance fixes.
- Flexviews uses fewer temporary tables and subqueries
- A full table scan of the view is no longer required (only changed GB keys are scanned)
- Dead code has been removed
- Bug fixes
- Removing tables and adding them to a view again could result in the WHERE clause being generated in the wrong order
- Fix a problem with applying deltas to views which use PERCENTILE
- Improved error messages
- FlexCDC Bug fixes
- Bulk insert mode did not work when a transaction changed rows in more than one table
Please note that I am not currently making separate FlexCDC releases. If you are only interested in FlexCDC, you will find it in the consumer/ subdirectory of the Flexviews tarball. I will package FlexCDC up as a standalone release along with the GA release of Flexviews.
Whats new in Flexviews 1.6.0RC1
As always, get it at:
http://sourceforge.net/projects/Flexview s
- This is the first release candidate before the final release. If no major bugs are uncovered, then the next release will be the first GA release.
- Flexviews now has a test suite for all major features. The creation of these tests uncovered a number of issues which have been resolved in this release.
- All MySQL aggregate functions except GROUP_CONCAT are now supported.
- A special aggregate function called PERCENTILE is now also supported. The calculation uses a modified version of the GROUP_CONCAT based solution suggested by Roland Bouman for percentiles. This function should be considered experimental. Please report bugs if you find any.
- You can add indexes to enabled materialized views using SQL_API/add_expr
- Adding PRIMARY KEY indexes is no longer supported. All views get an auto_incrementing primary key. You can add additional UNIQUE indexes instead.
- There is an upgrade process from 1.5.3b (see UPGRADE and upgrade.sql)
- Views with aggregate functions but no GROUP BY columns now work properly for all supported aggregate function types
- NULL values in GROUP BY columns are now properly supported
- NULL values now work properly with distributive aggregate functions
- There is a wrapper script around run_consumer.php which can restart the consumer if it stops running
As always, get it at:
http://sourceforge.net/projects/Flexview
Improvements for Flexviews, materialized view for MySQL 5.1+
- There is up-to-date documentation available as manual.html, a robodoc generated version of the manual for the SQL_API interface. The manual is generated from RoboDOC comments embedded in the source code. This makes it easy to maintain the documentation if the interface changes.
- Flexviews has a new easier to read, and hopefully easier to understand website. You can find a copy of the manual on the web site as well.
- There is a SQL converter on the website than will automatically generate Flexviews SQL_API calls from a SELECT statement. The next step will be to turn this into a MySQL proxy script that can handle 'CREATE ALGORITHM=INCREMENTAL MATERIALIZED VIEW ...'
- Support for the FlexCDC auto_changelog mode, which automatically logs changes of all tables. This is great for small schemas.
Improvements for FlexCDC
- The main wrapper scripts around flexcdc.php accidently included 'consumer.php' instead of 'flexcdc.php', which I had renamed shortly before the prior release. This problem has been resolved.
- I added an ini parameter to control the issuing of warnings by the script
- I added info to the README about when to not use auto_changelog
- flexcdc.php now looks for the ini file in the same directory in which the script resides, instead of the working directory
- the next release will have a command line option to specify the ini location. this will make it easy to run more than one consumer.
I am going to work on a HOWTO video which shows how to install and use Flexviews. I really hope that the automatic conversion of SQL by the conversion tool helps people understand the API. For those who still don't want to use it, well I guess you will have to wait for the MySQL proxy integration.
*edit* I want to point out that this test was done on a single database server which used MySQL partitioning. This is a demonstration of how Shard-Query can improve performance in non-sharded databases too.*edit*.
Over the weekend I spent a lot of time improving my new Shard-Query tool (code.google.com/p/shard-query) and the improvements can equate to big performance gains on partitioned data sets versus executing the query directly on MySQL.

I'll explain this graph below, but lower is better (response time) and Shard-Query is the red line.
MySQL understands that queries which access data in only certain partitions don't have to read the rest of the table. This partition elimination works well, but MySQL left a big optimization out of partitioning: getting data in parallel.
In fact, since partition elimination is the only major optimization provided by the partition options it isn't great for scaling access to large data sets when the entire data set must be accessed, but only when smaller parts of a the set are examined.
Since Shard-Query exploits parallelism with Gearman (http://www.gearman.org) I decided to extend the Shard-Query "optimizer" to support running queries with IN lists in parallel. This makes a query scale much further than it would if there was no parallelism at work.
Consider the table following partitioned fact table:
The table is partitioned into 100 partitions, and there are 100 distinct values for i1. This means that all the values for a particular i1 are housed in a single partition.
Consider the following query:
This query is semantically equivalent to:
Unfortunately, MySQL does not have any intra-query parallelism, so rewriting the query that way is not an effective scaling strategy. However, if you execute all three queries at the same time, and use a temporary table as the UNION ALL, you can actually get parallelism. This is what Shard-Query does. It can take each IN list item and assign it to a worker, and stuff the results back together at the end.
The second thing that can be done to improve performance at the partition level (or the shard level) is to push down aggregation of distributable aggregate functions to the worker. If you've read my blog before you might know that the distributable aggregate functions are SUM and COUNT.
Consider a query very much like the previous query:
This query features aggregation with distributable functions. This query is semantically equivalent to the following:
This query is semantically equivalent to:
Shard-Query can push the aggregation down to the shards and it sends each query which is part of the "UNION ALL" operation in parallel.
I ran a benchmark based on the above table with 40M rows in it. That is 400K rows per shard.
The benchmark queries follow the pattern:
All the way up to 100 values in the IN list. The table contains 400K rows for each i1 value.
Here is that graph again. For this test I used an EC2 "c1.large" instance. That is, 8 cores with 8 GB of memory. I used a 4GB data set, Percona Server 10.2 and a 1GB buffer pool size. Each partition is approximately 32MB in size.
Since the machine has eight cores, I started eight Gearman workers. The results are, I think, impressive. In the graph, "partitions scanned" is the number of values in the IN list.

This performs very well due to the pushdown of the aggregation. If a non-distributable aggregate function were to be used, then performance would probably be worse than MySQL because all 40M rows would be accessed and copied into a temporary table.
However, it can add a lot of parallelism to queries which scan multiple partitions but return fewer rows, that is, queries with a more restrictive where clause.
Over the weekend I spent a lot of time improving my new Shard-Query tool (code.google.com/p/shard-query) and the improvements can equate to big performance gains on partitioned data sets versus executing the query directly on MySQL.
I'll explain this graph below, but lower is better (response time) and Shard-Query is the red line.
MySQL understands that queries which access data in only certain partitions don't have to read the rest of the table. This partition elimination works well, but MySQL left a big optimization out of partitioning: getting data in parallel.
In fact, since partition elimination is the only major optimization provided by the partition options it isn't great for scaling access to large data sets when the entire data set must be accessed, but only when smaller parts of a the set are examined.
Since Shard-Query exploits parallelism with Gearman (http://www.gearman.org) I decided to extend the Shard-Query "optimizer" to support running queries with IN lists in parallel. This makes a query scale much further than it would if there was no parallelism at work.
Consider the table following partitioned fact table:
CREATE TABLE `fact` ( `id` bigint(20) unsigned DEFAULT NULL, `a_id` bigint(20) unsigned DEFAULT NULL, `b_id` int(11) NOT NULL, `c_id` int(11) NOT NULL, `i1` tinyint(4) DEFAULT NULL, `qty` smallint(6) DEFAULT NULL, `score` decimal(10,10) DEFAULT NULL, `price` decimal(7,3) DEFAULT NULL, `i2` int(11) DEFAULT NULL, `i3` int(11) DEFAULT NULL, `wide_row` char(54) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=latin1 /*!50100 PARTITION BY HASH (i1) PARTITIONS 100 */
The table is partitioned into 100 partitions, and there are 100 distinct values for i1. This means that all the values for a particular i1 are housed in a single partition.
Consider the following query:
select price*qty from fact where i1 in (1,2,3);
This query is semantically equivalent to:
select price*qty from fact where i1 = 1 UNION ALL select price*qty from fact where i1 = 2 UNION ALL select price*qty from fact where i1 = 3
Unfortunately, MySQL does not have any intra-query parallelism, so rewriting the query that way is not an effective scaling strategy. However, if you execute all three queries at the same time, and use a temporary table as the UNION ALL, you can actually get parallelism. This is what Shard-Query does. It can take each IN list item and assign it to a worker, and stuff the results back together at the end.
The second thing that can be done to improve performance at the partition level (or the shard level) is to push down aggregation of distributable aggregate functions to the worker. If you've read my blog before you might know that the distributable aggregate functions are SUM and COUNT.
Consider a query very much like the previous query:
select shard_col, sum(price * qty) from t1 where shard_col in (1,2,3) group by shard_col;
This query features aggregation with distributable functions. This query is semantically equivalent to the following:
This query is semantically equivalent to:
SELCT shard_col, SUM(`sum(price*qty)`) as `sum(price*qty)`
from ( select shard_col, sum(price*qty) from t1 where shard_col = 1 group by shard_col
UNION ALL
select shard_col, sum(price*qty) from t1 where shard_col = 2 group by shard_col
UNION ALL
select shard_col, sum(price*qty) from t1 where shard_col = 3 group by shard_col
) GROUP BY shard_col;
Shard-Query can push the aggregation down to the shards and it sends each query which is part of the "UNION ALL" operation in parallel.
I ran a benchmark based on the above table with 40M rows in it. That is 400K rows per shard.
The benchmark queries follow the pattern:
select i1,sum(qty*price) from SAB_SF1.fact where i1 in(1) group by i1; select i1,sum(qty*price) from SAB_SF1.fact where i1 in(1,2) group by i1; select i1,sum(qty*price) from SAB_SF1.fact where i1 in(1,2,3) group by i1; select i1,sum(qty*price) from SAB_SF1.fact where i1 in(1,2,3,4) group by i1; ...
All the way up to 100 values in the IN list. The table contains 400K rows for each i1 value.
Here is that graph again. For this test I used an EC2 "c1.large" instance. That is, 8 cores with 8 GB of memory. I used a 4GB data set, Percona Server 10.2 and a 1GB buffer pool size. Each partition is approximately 32MB in size.
Since the machine has eight cores, I started eight Gearman workers. The results are, I think, impressive. In the graph, "partitions scanned" is the number of values in the IN list.
This performs very well due to the pushdown of the aggregation. If a non-distributable aggregate function were to be used, then performance would probably be worse than MySQL because all 40M rows would be accessed and copied into a temporary table.
However, it can add a lot of parallelism to queries which scan multiple partitions but return fewer rows, that is, queries with a more restrictive where clause.
This is the simple mysql benchmark script:
[root@localhost benchmarks]# cat inlist_test.php
$conn = mysql_connect() or die(mysql_error());
$fh = fopen('inlist.sql', 'r');
while($line = fgets($fh)) {
$start=microtime(true);
$stmt = mysql_query($line, $conn);
while($row = mysql_fetch_assoc($stmt)) {
}
mysql_free_result($stmt);
echo "WALLTIME::" . (microtime(true) - $start) . "s\n";
}
[root@localhost benchmarks]# head inlist.sql -n4
select i1,sum(qty*price) from SAB_SF1.fact where i1 in(1) group by i1;
select i1,sum(qty*price) from SAB_SF1.fact where i1 in(1,2) group by i1;
select i1,sum(qty*price) from SAB_SF1.fact where i1 in(1,2,3) group by i1;
select i1,sum(qty*price) from SAB_SF1.fact where i1 in(1,2,3,4) group by i1;
The Shard query times were captured with:
cat inlist.sql | ./shard_query.php --ini=parallel_query.ini --one --inlist=* --pushdown|grep WALL
I created a new tool this week:
http://code.google.com/p/shard-quer y
As the name Shard-Query suggests, the goal of the tool is to run a query over multiple shards, and to return the combined results together as a unified query. It uses Gearman to ask each server for a set of rows and then runs the query over the combined set. This isn't a new idea, however, Shard-Query is different than other Gearman examples I've seen, because it supports aggregation.
It does this by doing some basic query rewriting based on the input query.
Take this query for example:
The tool will split this up into two queries.
This first query will be sent to each shard. Notice that any aggregations, grouping or sorting has been stripped away. Any joins or where clauses are still present:
Shard-Query supports two different methods of coalescing the results from each shard. The first is called 'fetch' which as it sounds, fetches the results through an array, and inserts them into a temporary table. This is less desirable than the default method, which is called store. When the store method is used, each worker uses bulk insert to insert rows directly into a destination table. The downside is that the table has to be a physical table, since it must be inserted into from multiple connections. The store method is also better than fetch because the rows are inserted into the table in parallel from each shard. The insertion is serialized with the fetch method, since the results are handled by a single threaded PHP script.
Either a temporary table, or a concrete table will be created to hold the rows from all the shards depending on the fetch method.
Finally, after all the rows are inserted into that table, the final rewritten query is executed. Notice that the SELECT clause has been rewritten to accommodate the column names of the temporary table. Any FROM clause has been removed and replaced with a scan of the single table. Also notice the WHERE clause is not present, because the rows were filtered at each shard:
Finally, the result is output and then the table is dropped.
I hope you find it useful. Right now it works with MySQL using the plain old mysql_ class of functions, but I can abstract the database access out if somebody really wants me to. This could potentially be used for just about any database which supports SQL.
http://code.google.com/p/shard-quer
As the name Shard-Query suggests, the goal of the tool is to run a query over multiple shards, and to return the combined results together as a unified query. It uses Gearman to ask each server for a set of rows and then runs the query over the combined set. This isn't a new idea, however, Shard-Query is different than other Gearman examples I've seen, because it supports aggregation.
It does this by doing some basic query rewriting based on the input query.
Take this query for example:
select c2,
sum(s0.c1),
max(c1)
from t1 as s0
join t1 using (c1,c2)
where c2 = 98818
group by c2;
The tool will split this up into two queries.
This first query will be sent to each shard. Notice that any aggregations, grouping or sorting has been stripped away. Any joins or where clauses are still present:
select c2 as `c2` ,
(s0.c1) as `(s0.c1)` ,
(c1) as `(c1)`
from t1 s0
join t1
using (c1,c2)
where c2 = 98818;
Shard-Query supports two different methods of coalescing the results from each shard. The first is called 'fetch' which as it sounds, fetches the results through an array, and inserts them into a temporary table. This is less desirable than the default method, which is called store. When the store method is used, each worker uses bulk insert to insert rows directly into a destination table. The downside is that the table has to be a physical table, since it must be inserted into from multiple connections. The store method is also better than fetch because the rows are inserted into the table in parallel from each shard. The insertion is serialized with the fetch method, since the results are handled by a single threaded PHP script.
Either a temporary table, or a concrete table will be created to hold the rows from all the shards depending on the fetch method.
CREATE TEMPORARY TABLE -- not a temporary table when the method is store
IF NOT EXISTS `aggregation_tmp_#14861474`
( `c2` BINARY(255),
`(s0.c1)` BINARY(255),
`(c1)` BINARY(255)
) ENGINE=INNODB;
Finally, after all the rows are inserted into that table, the final rewritten query is executed. Notice that the SELECT clause has been rewritten to accommodate the column names of the temporary table. Any FROM clause has been removed and replaced with a scan of the single table. Also notice the WHERE clause is not present, because the rows were filtered at each shard:
select (`c2`),
sum(`(s0.c1)`),
max(`(c1)`)
from `aggregation_tmp_#14861474`
group by `c2`;
Finally, the result is output and then the table is dropped.
c2 sum(`(s0.c1)`) max(`(c1)`) - - - 98818 16 8
I hope you find it useful. Right now it works with MySQL using the plain old mysql_ class of functions, but I can abstract the database access out if somebody really wants me to. This could potentially be used for just about any database which supports SQL.