Parallel query without parallel query
One of the key improvements since Postgres 9.6 has been the ability to distribute a query to multiple worker processes. Yet, with a few tricks this has been feasible almost forever, at least for the really heavy stuff.
I prefer to keep the database specific stuff in the database and application related logic in the application. For me, that translates to having almost all database access implemented as database functions. That allows for later optimization without touching the application logic. It does not work out always, but more often than not. So, for me, it's worthwhile.
That said, let's assume you have a query that takes 3 seconds, and you know it can be split into 3 parts where every part takes about 1 seconds. Here is an example:
WITH num(i) AS (
SELECT generate_series(1,3000)
)
, div(i) AS (
SELECT generate_series(1,3000)
)
SELECT num.i
FROM num
CROSS JOIN div
GROUP BY 1
HAVING count(CASE WHEN num.i % div.i = 0 THEN 1 END) = 2
ORDER BY 1
This is a pretty inefficient way to find the prime numbers between 1 and 3000. We build the cross join of num
and div
. That means the result has for each element of num
all elements of div
. Now, for each of these, 9000000 rows num.i
is divided by div.i
and the remainder is tested to be zero. If so, num.i
is divisible by div.i
. For a prime number, that happens only when div.i = 1
or div.i = num.i
that is 2 times.
Since the actual prime numbers do not matter, they are sent to /dev/null
:
postgres=# \timing on
Timing is on.
postgres=# \o /dev/null
postgres=# WITH num(i) AS (
postgres(# SELECT generate_series(1,3000)
postgres(# )
postgres-# , div(i) AS (
postgres(# SELECT generate_series(1,3000)
postgres(# )
postgres-# SELECT num.i
postgres-# FROM num
postgres-# CROSS JOIN div
postgres-# GROUP BY 1
postgres-# HAVING count(CASE WHEN num.i % div.i = 0 THEN 1 END) = 2
postgres-# ORDER BY 1
postgres-# ;
Time: 3233.566 ms (00:03.234)
The query takes roughly 3 seconds.
The beauty of this query is that it can be divided into separate jobs testing the numbers from 1 to 1000, from 1001 to 2000 and from 2001 to 3000. Here is for instance the middle one:
postgres=# WITH num(i) AS (
postgres(# SELECT generate_series(1001,2000)
postgres(# )
postgres-# , div(i) AS (
postgres(# SELECT generate_series(1,3000)
postgres(# )
postgres-# SELECT num.i
postgres-# FROM num
postgres-# CROSS JOIN div
postgres-# GROUP BY 1
postgres-# HAVING count(CASE WHEN num.i % div.i = 0 THEN 1 END) = 2
postgres-# ORDER BY 1
postgres-# ;
Time: 1107.758 ms (00:01.108)
Now, the goal is to have Postgres execute the 3 jobs simultaneously.
DBLINK
The key ingredient to get there is the dblink
extension:
CREATE EXTENSION IF NOT EXISTS dblink;
It provides a couple of functions that allow a backend to connect to another Postgres database. The most prominent of these functions is called just dblink
. It is passed 2 parameters, a connection string specifying which database to connect to and the SQL query.
Example:
SELECT *
FROM dblink('dbname=postgres port=5434', $$
SELECT setting
FROM pg_settings
WHERE name='port'
$$) t(port int)
This could be the result:
port
------
5434
(1 row)
The connection string I am using in this example connects to the same database where the dblink
function is called.
Note, dblink
returns a set of RECORD
s. That means you have to describe the result type each time it is called (the t(port int)
part).
Nowadays, dblink
may be a little frowned upon for being old-fashioned. For a couple of years now, Postgres has been coming with foreign data wrappers that allow you to use tables and views residing in external databases in local queries just like local tables and views. That is true. And foreign data wrappers are an important feature. Yet, I think, dblink
has stilled its niche. It gives you better control how exactly the query on the remote machine looks like and can give you much better performance in more complex situations. Although, the Postgres FDW has improved a lot in recent versions.
Named connections
Apart from the dblink
function, the extension provides more low-level functions. The following query, for instance, opens 3 connections to the same database naming them cn1
, cn2
and cn3
:
SELECT dblink_connect(cn, 'dbname=postgres port=5434')
FROM (VALUES ('cn1'), ('cn2'), ('cn3')) conn(cn);
These 3 additional backends can be seen now in the pg_stat_activity
view:
postgres=# select pid, state, query from pg_stat_activity order by backend_start;
pid | state | query
-------+--------+------------------------------------------------------------------------
32544 | active | select pid, state, query from pg_stat_activity order by backend_start;
1084 | idle |
1085 | idle |
1086 | idle |
(4 rows)
Or you can use dblink_get_connections()
to return a list of connections used in the current session.
postgres=# select dblink_get_connections();
dblink_get_connections
------------------------
{cn3,cn2,cn1}
(1 row)
Here is how these connections can be used:
SELECT *
FROM (VALUES ('cn1'), ('cn2'), ('cn3')) conn(cn)
CROSS JOIN dblink(conn.cn, $$
SELECT pg_backend_pid()
$$) t(pid int)
Here is the result:
cn | pid
-----+------
cn1 | 1084
cn2 | 1085
cn3 | 1086
(3 rows)
As expected, we get the same PIDs as from pg_stat_activity
.
Note, a named connection like the 3 above is closed only when the current backend is terminated or when they are explicitly closed by dblink_disconnect
. It is not automatically closed at the end of a transaction or statement. Also, attempting to open a named connection where the name is already in use results in an error.
Sending asynchronous queries
The next functions that I want to introduce are dblink_send_query
and dblink_get_result
. The former sends a query on a named connection to the backend and comes back immediately, not waiting for the backend to finish. The latter can then be used to wait for and read the result. Dblink
has a few more functions in this context. But these two are enough to reach our goal.
SELECT *
FROM (VALUES ('cn1'), ('cn2'), ('cn3')) conn(cn)
CROSS JOIN dblink_send_query(conn.cn, $$
SELECT pg_backend_pid()
$$) t;
Well, that looks pretty similar to the simple dblink
call above. Only, dblink_send_query
returns 1
or 0
whether the query has been sent:
cn | t
-----+---
cn1 | 1
cn2 | 1
cn3 | 1
(3 rows)
Fetching the result
Now, dblink_get_result
can be used to fetch the result. Note, this function now returns a set of RECORD
s. So, the expected set of columns must be described here.
SELECT *
FROM (VALUES ('cn1'), ('cn2'), ('cn3')) conn(cn)
CROSS JOIN dblink_get_result(conn.cn) t(pid int)
cn | pid
-----+------
cn1 | 1084
cn2 | 1085
cn3 | 1086
(3 rows)
Putting it all together
CREATE OR REPLACE FUNCTION primes (p_limit BIGINT, p_step BIGINT)
RETURNS TABLE (n BIGINT) AS $def$
DECLARE
v_q RECORD;
BEGIN
FOR v_q IN
WITH RECURSIVE intv AS (
SELECT 1::BIGINT AS s, least($2, $1) AS e
UNION ALL
SELECT e + 1, least(e + $2, $1)
FROM intv
WHERE e < $1
)
, jobs AS (
SELECT 'cn' || row_number() OVER () AS cn,
intv.s, intv.e
FROM intv
ORDER BY intv.s
)
, conn AS (
SELECT *,
1/(dblink_connect(cn, 'dbname=postgres port=5434')='OK')::INT AS connstatus
FROM jobs
)
SELECT conn.*, 1/q.status AS sendstatus
FROM conn
CROSS JOIN LATERAL dblink_send_query(conn.cn,
$$
WITH num(i) AS (
SELECT generate_series($$ || conn.s || $$,
$$ || conn.e || $$)
)
, div(i) AS (
SELECT generate_series(1,$$ || $1 || $$)
)
SELECT num.i
FROM num
CROSS JOIN div
GROUP BY 1
HAVING count(CASE WHEN num.i % div.i = 0 THEN 1 END) = 2
ORDER BY 1
$$) q(status)
LOOP
RETURN QUERY
SELECT tb.i
FROM dblink_get_result(v_q.cn) tb(
i BIGINT
);
PERFORM dblink_disconnect(v_q.cn);
END LOOP;
END
$def$ LANGUAGE plpgsql;
The 1st part, intv
, of the recursive CTE computes a table like
s | e
------+------
1 | 1000
1001 | 2000
2001 | 3000
(3 rows)
This could have been done easier with generate_series()
. But in more complex situations, I found the CTE approach more flexible. The jobs
part then adds cn1
, cn2
etc, to the table.
Now, conn
opens the connections. dblink_connect()
returns the string OK
on success. If the connection cannot be established, the whole query should fail. Hence, the resulting string is compared with OK
and the resulting BOOLEAN
converted to INT
. That gives either 0
for FALSE
or 1
for TRUE
. Now a division by 0
throws an exception making the entire query fail.
The main query then uses dblink_send_query
to send the query to the backend. The same division by zero approach is used to make sure it has been sent successfully.
When this query is done, we have a result set with several named dblink
connections, and each of the backends is processing part of the problem. A plpgsql
for-loop is then used to iterate through this set and fetch the results from the backends. It also closes the connections on the way.
Does it work?
First, we have to check if the result matches the one of the original query. To do that, I usually work with temp tables.
CREATE TEMP TABLE orig(i) AS
WITH num(i) AS (
SELECT generate_series(1,3000)
)
, div(i) AS (
SELECT generate_series(1,3000)
)
SELECT num.i
FROM num
CROSS JOIN div
GROUP BY 1
HAVING count(CASE WHEN num.i % div.i = 0 THEN 1 END) = 2
ORDER BY 1
and the function result for various step sizes:
postgres=# CREATE TEMP TABLE a(i) AS select * from primes(3000,3000);
SELECT 430
Time: 3113.399 ms (00:03.113)
postgres=# CREATE TEMP TABLE b(i) AS select * from primes(3000,1500);
SELECT 430
Time: 1599.998 ms (00:01.600)
postgres=# CREATE TEMP TABLE d(i) AS select * from primes(3000,1000);
SELECT 430
Time: 1711.761 ms (00:01.712)
postgres=# CREATE TEMP TABLE c(i) AS select * from primes(3000,800);
SELECT 430
Time: 1695.348 ms (00:01.695)
postgres=# select * from orig except all select * from a;
i
---
(0 rows)
Each of the function invocations returns 430 rows. The original query did the same. Since all the temp tables have the same number of rows, it's enough to see if all rows from orig
minus all rows from a
(or b
or c
or d
) gives the empty set.
Now, what about the timing?
If we want to find all primes up to 3000 and have only one backend (step=3000) the query takes roughly the same time as the original. If the work is distributed to 2 processes, the time taken is also cut in half. However, if more jobs are introduces, it does not become better. The reason might be that the machine is a relatively cheap AWS instance. Although it shows 4 CPUs, this is probably a lie, kind of.
One step further
So far, so good. But isn't it possible to write a single SQL statement that does all of this and also collects the results and closes the connections? That way, the necessity of a function could be avoided.
A CTE is an optimization barrier. So, each part of a CTE can in principle be approached like a separate statement. At least so I thought.
WITH v_q AS (
WITH jobs(cn) AS (
VALUES ('c1'), ('c2')
)
, conn AS (
SELECT *, 1/(dblink_connect(cn, 'dbname=postgres port=5434')='OK')::INT AS connstatus
FROM jobs
)
SELECT conn.*, 1/q.status AS sendstatus
FROM conn
CROSS JOIN LATERAL dblink_send_query(conn.cn,
$$
select now(), pg_sleep(3), clock_timestamp()
$$) q(status)
)
SELECT tb.tx_time, tb.end_time
FROM v_q
CROSS JOIN dblink_get_result(v_q.cn) tb(
tx_time TIMESTAMP,
dummy TEXT,
end_time TIMESTAMP
);
Here v_q
represents the set that was in the function iterated through in the for-loop. The main query then loops through that set, collecting the result.
Unfortunately, the 2nd dblink_send_query
is started only after the result from the first is there:
tx_time | end_time
----------------------------+----------------------------
2017-12-07 23:06:19.687594 | 2017-12-07 23:06:22.688422
2017-12-07 23:06:22.690816 | 2017-12-07 23:06:25.692413
(2 rows)
Since that does not work, I didn't try to add connection closing as well.
Note: This post has been ported from https://tech.binary.com/ (our old tech blog)