Blog

Optimizing TopK queries in DataFusion

28 Sep, 2022
Xebia Background Header Wave

Are you interested in your most popular products, the top contributions to your KPI or answering similar questions about your data? Probably so, as it’s very common to want insights like this in analytics, reports and dashboards.

Answering questions like these using the Apache Arrow DataFusion query engine just got way faster and uses less memory after optimizations to this open source project [1] [2] [3] [4]. The optimizations will be available in the next release of DataFusion (13).

An SQL query in that "TopK" form looks like this:

SELECT
  o_orderkey,
  o_totalprice
FROM 
  orders
ORDER BY 
  o_totalprice DESC
LIMIT 10

The query returns the orders with the highest total price.
This gives the following result (based on the orders table from the TPCH benchmark)

+------------+--------------+
| o_orderkey | o_totalprice |
+------------+--------------+
| 39394405   | 558822.56    |
| 3578692    | 558702.81    |
| 42290181   | 558289.17    |
| 2745894    | 557664.53    |
| 36667107   | 550142.18    |
| 43906817   | 549431.65    |
| 21213895   | 549380.08    |
| 49667013   | 542546.72    |
| 2199712    | 542154.01    |
| 30671170   | 541620.62    |
+------------+--------------+

In order to get the results for the query you need to sort the data (ORDER BY) and then take the first 10 (LIMIT 10) results. And this is precisely what Datafusion would do before!

Let’s look at what the previous version of DataFusion does by looking at a (simplified) execution plan of a simple query. We can do so by using an EXPLAIN statement.

| GlobalLimitExec: skip=0, fetch=10
|   SortExec: [o_totalprice@4 DESC]
|     CoalescePartitionsExec
|       ParquetExec: limit=None, partitions=[..], projection=[..]

You can read the plan from bottom to top:

  • ParquetExec: Scan the full (Parquet) table with all columns that it needs. limit=None means that there is no limit: it needs to read all the data in order to produce the results.
  • CoalescePartitionsExec: all the partitions are combined into one partition. Without doing this, sorting individual partitions will only give sorted results within each partition.
  • SortExec: Sorts all the input data by the order by expression. This operator sorts and maintains all input data and sorts / merges the individual batches. The output is fully sorted data. If the data doesn’t fit in memory, this operator spills the data to disk during processing.
  • GlobalLimitExec: limits the output to only output fetch items, in this case the first 10.

Looking at the plan, there are two important optimizations possible:

  • Parallel sorting. DataFusion can first sort individual partitions in parallel. After this, the sorted partitions should be merged to get a fully sorted output. Luckily there is already an operator for this in DataFusion called SortPreservingMergeExec. This takes in a number of partitions with (individually) sorted data and outputs one partition with sorted data.
  • Pushing down the limit to SortExec. Using the information that it needs only 10 items, we can reduce the amount of work to sort the input data and reducing memory usage. By knowing that we only want 10 items, it can use the more efficient partial sort, and directly discard items beyond the top 10. This means that instead of buffering batches with 8192 (the default batch size) sorted items each, we only keep batches of 10 items (819.2 times smaller), saving most of the further processing and memory usage. We can also use it to optimize merging the sorted batches, again by using the information to sort more efficiently and skip beyond the first 10 items. The output of SortExec will now only contain 10 items, limiting the work necessary in SortPreservingMergeExec. For really large tables it also means that we avoid spilling the data to disk in most cases, saving even more time! In Ballista, the distributed query engine based on DataFusion, savings are even larger as it avoids shuffling large datasets, which is often the slowest part of distributed query execution.

Ok, so what does the plan look like after those optimizations?

| GlobalLimitExec: skip=0, fetch=10
|   SortPreservingMergeExec: [o_totalprice@0 DESC]
|       SortExec: [o_totalprice@4 DESC] fetch=10
|         ParquetExec: limit=None, partitions=[..], projection=[..]

What’s different now? CoalescePartitionsExec is gone, meaning that sorts can be executed fully in parallel. SortExec has a fetch parameter added, saving most of the work and memory. A SortPreservingMergeExec has been added, to merge the sorted partitions into one partition with sorted partitions merged to a fully sorted output.

Are we doing any better in terms of execution time? Let’s find out!

First we run it in DataFusion 12:

SELECT o_orderkey, o_totalprice FROM orders ORDER BY o_totalprice DESC 
LIMIT 10;
+------------+--------------+
| o_orderkey | o_totalprice |
+------------+--------------+
| 39394405   | 558822.56    |
| 3578692    | 558702.81    |
| 42290181   | 558289.17    |
| 2745894    | 557664.53    |
| 36667107   | 550142.18    |
| 43906817   | 549431.65    |
| 21213895   | 549380.08    |
| 49667013   | 542546.72    |
| 2199712    | 542154.01    |
| 30671170   | 541620.62    |
+------------+--------------+
10 rows in set. Query took 1.460 seconds.

Now running the same query in a version after the changes:

SELECT o_orderkey, o_totalprice FROM orders ORDER BY o_totalprice DESC 
LIMIT 10;
+------------+--------------+
| o_orderkey | o_totalprice |
+------------+--------------+
| 39394405   | 558822.56    |
| 3578692    | 558702.81    |
| 42290181   | 558289.17    |
| 2745894    | 557664.53    |
| 36667107   | 550142.18    |
| 43906817   | 549431.65    |
| 21213895   | 549380.08    |
| 49667013   | 542546.72    |
| 2199712    | 542154.01    |
| 30671170   | 541620.62    |
+------------+--------------+
10 rows in set. Query took 0.093 seconds.

The execution time improved from around 1.5 seconds to finishing almost instantly. It runs more than 15 times as fast! Also the new version only has minimal memory usage, before it keeps around 3GB of data in memory.
The exact speedup to expect for other scenarios depends on the query and the input data, but you can expect big speedups for similar queries that include ORDER BY with a LIMIT.

DataFusion project

Are you interested in trying out DataFusion or interested to the open source project? Feel free to check out the project and join the community.

Questions?

Get in touch with us to learn more about the subject and related solutions

Explore related posts