Search code examples
oracle-databaseapache-sparkrdbms

partitioning the query results with ORA_HASH


I have few million rows in the RDBMS (Oracle 12+) and i'm using Spark (3+) to read the data into multiple partitions. However, the query execution is getting aborted by the Oracle server due to TEMP tablespace being consumed completely (different issue than this post). For simplicity, assume that I have 8 paritions defined in Spark's config and each Spark executor works with its own partition. Our SQL query is of the form :

select t.a, t.b, ORA_HASH(CAST((t.c) AS NUMBER(38)), #{8 - 1}) as hashedID from Table_1 t

I am not concerned about which hash algorithm is used under the hood and if its deterministic or if hash values are evenly distributed. My questions are :

  1. How does this work internally i.e. Does the Oracle server first applies the hash function to all values of column c and then pull values of column a and b for only those rows whose hash value is 0 OR does the Oracle server pull all columns first i.e. column a, b and c and then based on c's hash, return only the corresponding values of column a and b?
  2. When Spark parallalizes the query such that every partition gets its own set of data, I presume it is done something like this - every executor picks up the base query, wraps it with its executor number e.g. executor#3 will execute query which is somewhat like -
select a, b from (select t.a, t.b, ORA_HASH(CAST((t.c) AS NUMBER(38)), #{8 - 1}) as hashedID from Table_1 t) where hashedID = 3

is this correct understanding? If so, then every JDBC connection initiated by executor calculates the hash independent of other executors. This would lead to redundant computations. Is this correct?

I did try to understand how ORA_HASH works, but didn't find an explanation about how the projection made on the query which computes ORA_HASH works internally. Hence the question#1.


Solution

  • Oracle first retrieves the rows identified by your WHERE (and/or JOIN) clauses and then subsequently processes the columns as specified by your SELECT clause. Therefore, this:

    select t.a, t.b, ORA_HASH(CAST((t.c) AS NUMBER(38)), #{8 - 1}) as hashedID from Table_1 t
    

    Will pull 100% of the rows in Table_1 (which include columns a and b) and apply the CAST, then the ORA_HASH to column c. So, in answer to your first question:

    "Does the Oracle server first applies the hash function to all values of column c and then pull values of column a and b for only those rows whose hash value is 0 OR does the Oracle server pull all columns first i.e. column a, b and c and then based on c's hash, return only the corresponding values of column a and b"

    The latter is correct.

    When you add another wrapper around this:

    select a, b from (select t.a, t.b, ORA_HASH(CAST((t.c) AS NUMBER(38)), #{8 - 1}) as hashedID from Table_1 t) where hashedID = 3
    

    Oracle will push the hashID = 3 predicate into the view and rewrite it as:

    select t.a, t.b from Table_1 t where ORA_HASH(CAST((t.c) AS NUMBER(38)), #{8 - 1}) = 3
    

    This will scan 100% of the table's rows, applying the CAST then ORA_HASH functions to column c, and then filter out any rows where this doesn't equal 3, then return columns a and b from the filtered set.

    So, each of your threads incurs the I/O penalty of a full table scan + the CPU penalty of the CAST/ORA_HASH work on 100% of the rows, even though it is only asking for a slice of rows. This is very inefficient and applications that do brute-force extracts like this cause lots of problems for database health (DBAs do not like them). You are absolutely right that this does redundant computational work. When you add that the scans themselves are typically parallelized full table scans (using direct path read) the aggregate I/O pressure on your storage arrays from 10, 20, 30+ application threads doing this concurrently can be overwhelming and cause I/O response degradation across the system. This whole approach is wrong-headed and we (DBAs in my org) are quick to shut it down anytime we find developers using it with enough threads that it compromises our database health.

    Optimally the best way to parallelize and extract is to hash partition (Oracle's partitioning, not Sparks) the table and then fire up an SELECT against each hash partition using extended partition naming. But rarely are apps going to have this complexity. Other options are LIST partitioning with only one value per partition, and your app can issue a thread per value. Or RANGE and you round-robin through the partition values until you're done. However you do it, the advantage of aligning Oracle's partitioning with your extracts is that then each thread is only reading rows that belong to it, rather than redundantly reading every other thread's rows also.

    Another approach is to have a single session do a parallelized CTAS (CREATE TABLE AS SELECT) with an untyped column list and a partitioning clause to create a partitioned work table that aligns with the distribution method you want to use, then you can unleash the concurrent threads using their WHERE clause predicates to prune to their respective partitions of this work table. Of course the downside to this is the extra space requirement this involves on the database, and the cost of writing it.

    Lastly, some ingenious folks might consider a function-based index. While that would resolve the redundancy problem and remove the load concern for the database, it will perform very, very poorly for your application as 100% of the index entries will require a table block read, and that is hundreds of times slower than scanning a table (or table partition). So aligning extract threads with (Oracle's) partitioning is really the best solution.