I'm reading data in pyspark from postgres using jdbc connection. The table being read is large, about 240 million rows. I'm attempting to read it into 16 partitions. The read is being performed like this.
query = f"""
(select receiptid, itemindex, barcode, productnumberraw, itemdescription, itemdescriptionraw, itemextendedprice, itemdiscountedextendedprice, itemquantity, barcodemanufacturer, barcodebrand, barcodecategory1, barcodecategory2, barcodecategory3, isfetchpromo, ispartnerbrand, subscribeandsave, soldby, yyyymm, retailerid,
MOD(ABS(CAST('x' || md5(receiptid) AS bit(32))::int), {num_partitions}) AS partition_id from {table}_bck) as subquery
"""
# Optimize JDBC read options
df = spark.read \
.format("jdbc") \
.option("url", pg_url) \
.option("dbtable", query) \
.option("user", pg_properties["user"]) \
.option("password", pg_properties["password"]) \
.option("driver", pg_properties["driver"]) \
.option("numPartitions", num_partitions) \
.option("partitionColumn", "partition_id") \
.option("lowerBound", 0) \
.option("upperBound", num_partitions - 1) \
.load()
df = df.withColumn(
"productnumberint",
regexp_replace(col("productnumberraw"), "[#-]", "").cast(LongType())
).withColumn(
"barcodeint",
regexp_replace(col("barcode"), "[#-]", "").cast(LongType())
)
I then want to write the data back to postgres
df.rdd.foreachPartition(write_partition)
where write_partition just iterates the rows and does batch inserts using psycopg2.
My issue is that I am seeing the partition queries doubled on my database.
SELECT "receiptid","itemindex","barcode","productnumberraw","itemdescription","itemdescriptionraw","itemextendedprice","itemdiscountedextendedprice","itemquantity","barcodemanufacturer","barcodebrand","barcodecategory1","barcodecategory2","barcodecategory3","isfetchpromo","ispartnerbrand","subscribeandsave","soldby","yyyymm","retailerid","partition_id" FROM (select receiptid, itemindex, barcode, productnumberraw, itemdescription, itemdescriptionraw, itemextendedprice, itemdiscountedextendedprice, itemquantity, barcodemanufacturer, barcodebrand, barcodecategory1, barcodecategory2, barcodecategory3, isfetchpromo, ispartnerbrand, subscribeandsave, soldby, yyyymm, retailerid,
MOD(ABS(CAST('x' || md5(receiptid) AS bit(32))::int), 16) AS partition_id from mytable) as subquery WHERE "partition_id" >= 10 AND "partition_id" < 11
What is causing the double read of the data?
If you're seeing them duplicated in pg_stat_activity
, some of them might show a leader_pid
pointing at the pid
s of others, which means the query is being handled by multiple worker processes.
Seeing your queries distributed between multiple workers is especially likely on partitioned tables. The fact that you specifically narrowed this down to a single partition:
WHERE "partition_id" >= 10 AND "partition_id" < 11
doesn't prevent additional workers from being used. It also doesn't disqualify the default partition as a scan target. You can tweak asynchronous behaviour settings to control that:
select name, setting, short_desc
from pg_settings
where name in ( 'max_worker_processes'
,'max_parallel_workers_per_gather'
,'max_parallel_maintenance_workers'
,'max_parallel_workers'
,'parallel_leader_participation');
name | setting | short_desc |
---|---|---|
max_parallel_maintenance_workers | 2 | Sets the maximum number of parallel processes per maintenance operation. |
max_parallel_workers | 8 | Sets the maximum number of parallel workers that can be active at one time. |
max_parallel_workers_per_gather | 4 | Sets the maximum number of parallel processes per executor node. |
max_worker_processes | 8 | Maximum number of concurrent worker processes. |
parallel_leader_participation | on | Controls whether Gather and Gather Merge also run subplans. |
create table test (partition_id int, payload text)
partition by list(partition_id);
create table test1 partition of test for values in (1);
create table test2 partition of test for values in (2);
create table test_default partition of test default;
select setseed(.42);
insert into test
select 1, md5(random()::text)
from generate_series(1,5e5);
If I now tell a parallel dblink client to query that table and observe pg_stat_activity
, I'm also getting the query twice, as a pid
-leader_pid
pair:
create extension if not exists dblink;
select dblink_connect('parallel_client','');
select dblink_send_query('parallel_client',
$q$ select*from test where partition_id>=1 and partition_id<2;
$q$);
select pid,leader_pid,query from pg_stat_activity;
pid | leader_pid | query |
---|---|---|
786 | null | create extension if not exists dblink; select dblink_connect('parallel_client',''); select dblink_send_query('parallel_client', $q$ select*from test where partition_id>=1 and partition_id<2; $q$); select pid,leader_pid,query from pg_stat_activity; |
787 | null | select*from test where partition_id>=1 and partition_id<2; |
788 | 787 | select*from test where partition_id>=1 and partition_id<2; |
Explain
also shows that the query runs on 2 workers:
explain(analyze,verbose,settings)
select*from test where partition_id>=1 and partition_id<2;
QUERY PLAN |
---|
Gather (cost=1000.00..8766.49 rows=2652 width=36) (actual time=0.450..206.506 rows=500000 loops=1) |
Output: test.partition_id, test.payload |
Workers Planned: 2 |
Workers Launched: 2 |
-> Parallel Append (cost=0.00..7501.29 rows=1105 width=36) (actual time=0.013..48.702 rows=166667 loops=3) |
Worker 0: actual time=0.015..3.538 rows=15360 loops=1 |
Worker 1: actual time=0.015..3.262 rows=15360 loops=1 |
-> Parallel Seq Scan on public.test1 test_1 (cost=0.00..7474.56 rows=1102 width=36) (actual time=0.012..35.920 rows=166667 loops=3) |
Output: test_1.partition_id, test_1.payload |
Filter: ((test_1.partition_id >= 1) AND (test_1.partition_id < 2)) |
Worker 0: actual time=0.014..2.525 rows=15360 loops=1 |
Worker 1: actual time=0.014..2.207 rows=15360 loops=1 |
-> Parallel Seq Scan on public.test_default test_2 (cost=0.00..21.21 rows=4 width=36) (actual time=0.001..0.001 rows=0 loops=1) |
Output: test_2.partition_id, test_2.payload |
Filter: ((test_2.partition_id >= 1) AND (test_2.partition_id < 2)) |
Settings: max_parallel_workers_per_gather = '4', search_path = 'public' |
Planning Time: 0.232 ms |
Execution Time: 224.640 ms |
Even if you change the condition to point at the specific partition and disqualify the default, you can still get more than one worker:
explain(analyze,verbose,settings)
select*from test where partition_id=1;
QUERY PLAN |
---|
Gather (cost=1000.00..8187.90 rows=2646 width=36) (actual time=0.206..240.995 rows=500000 loops=1) |
Output: test.partition_id, test.payload |
Workers Planned: 2 |
Workers Launched: 2 |
-> Parallel Seq Scan on public.test1 test (cost=0.00..6923.30 rows=1102 width=36) (actual time=0.017..31.386 rows=166667 loops=3) |
Output: test.partition_id, test.payload |
Filter: (test.partition_id = 1) |
Worker 0: actual time=0.022..2.331 rows=13440 loops=1 |
Worker 1: actual time=0.018..2.389 rows=13440 loops=1 |
Settings: max_parallel_workers_per_gather = '4', search_path = 'public' |
Planning Time: 0.227 ms |
Execution Time: 268.901 ms |
This can be changed on session or even transaction level:
set max_parallel_workers=0;
set max_parallel_workers_per_gather=0;
explain(analyze,verbose,settings)
select*from test where partition_id>=1 and partition_id<2;
QUERY PLAN |
---|
Append (cost=0.00..12147.44 rows=2652 width=36) (actual time=0.011..94.449 rows=500000 loops=1) |
-> Seq Scan on public.test1 test_1 (cost=0.00..12105.13 rows=2646 width=36) (actual time=0.010..59.975 rows=500000 loops=1) |
Output: test_1.partition_id, test_1.payload |
Filter: ((test_1.partition_id >= 1) AND (test_1.partition_id < 2)) |
-> Seq Scan on public.test_default test_2 (cost=0.00..29.05 rows=6 width=36) (actual time=0.012..0.012 rows=0 loops=1) |
Output: test_2.partition_id, test_2.payload |
Filter: ((test_2.partition_id >= 1) AND (test_2.partition_id < 2)) |
Settings: max_parallel_workers = '0', max_parallel_workers_per_gather = '0', search_path = 'public' |
Planning Time: 0.170 ms |
Execution Time: 109.770 ms |
explain(analyze,verbose,settings)
select*from test where partition_id=1;
QUERY PLAN |
---|
Seq Scan on public.test1 test (cost=0.00..10782.11 rows=2646 width=36) (actual time=0.025..52.193 rows=500000 loops=1) |
Output: test.partition_id, test.payload |
Filter: (test.partition_id = 1) |
Settings: max_parallel_workers = '0', max_parallel_workers_per_gather = '0', search_path = 'public' |
Planning Time: 0.073 ms |
Execution Time: 67.570 ms |