Search code examples
multithreadingplsqlparallel-processingoracle11gdbms-job

PL/SQL multi-threading


I have a task about bulk process of accounts(count of accounts > 20000) using pl sql. For accounts there are a lot of validations and also for now it works on single thread mode. And now existing job works more than 1 hour for this accounts. I'd like implement multi thread mode for this project. Read a lot informations about multi threading on pl sql. And I found that I can use "DBMS_PARALLEL_EXECUTE" package. I developed new logic with "DBMS_PARALLEL_EXECUTE" and It is works, but I see that during execution, all chunks waits each other. For example: I create 5 chunk. After job completed I see that 1st chunk worked for example 10 minutes, 2nd 11 minutes, 3rd 9 minutes, 4th 5 minutes and 5th 13 minutes. According to this results job has to be compeleted in 13 minutes if all chunks works parallely at the same time. But job completes working in 10 + 11 + 9 + 5 + 13 minutes. And it means that each chunk works after another one. But I wanna all chunks work at the same time parallel. How can I achive that, run all chunks parallel at the same time?

I put here example of my code which implemented "DBMS_PARALLEL_EXECUTE". (just draft codes for test)

DECLARE
    l_sql_stmt         VARCHAR2(1000);
    l_try              NUMBER;
    l_status           NUMBER;
    v_task             VARCHAR2(20) := 'T89';
    v_plsql_block      VARCHAR2(1000);
    v_status           VARCHAR2(50);
    v_status_desc      VARCHAR2(100);
    v_additional_sql   VARCHAR2(1000);
    v_original_sql     VARCHAR2(1000);
    v_count_of_chunks  NUMBER := 5;
    v_start_id         NUMBER := 1;
    v_per_thread_count NUMBER := 0;
    v_count_all_acc    NUMBER;
BEGIN

    v_original_sql := 'SELECT DISTINCT
                                        oac.cust_acc
                                    FROM
                                        xxtb_ovrdft_acc oac
                                    WHERE
                                        NOT EXISTS (
                                            SELECT
                                                1
                                            FROM
                                                xxtb_failed_acc acc
                                            WHERE
                                                    acc.curr_acc = oac.cust_acc
                                                AND acc.err_status = :status
                                        )';
    SELECT
        COUNT(DISTINCT oac.cust_acc)
    INTO v_count_all_acc
    FROM
        xxtb_ovrdft_acc oac;

    v_per_thread_count := round(v_count_all_acc / v_count_of_chunks);
    FOR i IN 1..v_count_of_chunks LOOP
        BEGIN
            v_task := v_task || i;
            dbms_output.put_line('task_NAME =' || v_task);
            dbms_parallel_execute.create_task(v_task);
            dbms_parallel_execute.create_chunks_by_sql(task_name => v_task, sql_stmt => 'SELECT level start_id, level end_id FROM dual connect by level <=1',
            by_rowid => false);

            v_additional_sql := v_original_sql
                                || ' and ACC_ID between '
                                || v_start_id
                                || ' and '
                                || CASE
                WHEN i = v_count_of_chunks THEN
                    v_count_all_acc
                ELSE v_per_thread_count * i
            END;

            v_plsql_block := 'begin fcjlive.xxpks_overdraft_payments.exec_do_payments('
                             || ''''
                             || v_additional_sql
                             || ''''
                             || ', :start_id, :end_id); end;';
            dbms_output.put_line('full sql = ' || v_plsql_block);
            v_start_id := v_start_id + v_per_thread_count;
            dbms_parallel_execute.run_task(task_name => v_task, sql_stmt => v_plsql_block, language_flag => dbms_sql.native, parallel_level =>
            2);

            v_status := dbms_parallel_execute.task_status(v_task);
        -- If there is an error, RESUME it for at most 2 times.
            WHILE (
                l_try < 2
                AND v_status != dbms_parallel_execute.finished
            ) LOOP
                l_try := l_try + 1;
                dbms_parallel_execute.resume_task(v_task);
                v_status := dbms_parallel_execute.task_status(v_task);
            END LOOP;

            SELECT
                decode(v_status, 6, '6-FINISHED', 5, '5-PROCESSING',
                       7, '7-FINISHED_WITH_ERROR', 8, '8-CRASHED', v_status)
            INTO v_status_desc
            FROM
                dual;

            dbms_output.put_line('task_status = ' || v_status_desc);
      --DBMS_PARALLEL_EXECUTE.resume_task (task_name => v_task);

            IF v_status = 5 THEN
                dbms_output.put_line('Wait 17 seconds');
                dbms_lock.sleep(17);
                BEGIN
                    dbms_output.put_line('Resume task');
                    dbms_parallel_execute.resume_task(task_name => v_task);
                EXCEPTION
                    WHEN OTHERS THEN
                        dbms_output.put_line('Resume ended with error: '
                                             || sqlcode
                                             || ' '
                                             || sqlerrm
                                             || ' '
                                             || dbms_utility.format_error_backtrace);
                END;

                SELECT
                    decode(v_status, 6, '6-FINISHED', 5, '5-PROCESSING',
                           7, '7-FINISHED_WITH_ERROR', 8, '8-CRASHED', v_status)
                INTO v_status_desc
                FROM
                    dual;

                dbms_output.put_line('task_status=' || v_status_desc);
            ELSIF v_status != dbms_parallel_execute.finished THEN
                BEGIN
                    dbms_parallel_execute.drop_chunks(v_task || i);
                    dbms_parallel_execute.drop_task(v_task || i);
                EXCEPTION
                    WHEN OTHERS THEN
                        NULL;
                END;
            END IF;

        EXCEPTION
            WHEN OTHERS THEN
                dbms_output.put_line(sqlerrm);
                dbms_output.put_line(dbms_utility.format_error_backtrace());
                dbms_parallel_execute.drop_chunks(v_task);
                dbms_parallel_execute.drop_task(v_task);
        END;
    END LOOP;

    FOR i IN 1..v_count_of_chunks LOOP
        BEGIN
            dbms_parallel_execute.drop_chunks(v_task || i);
            dbms_parallel_execute.drop_task(v_task || i);
        EXCEPTION
            WHEN OTHERS THEN
                NULL;
        END;
    END LOOP;

EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line(sqlerrm);
        dbms_output.put_line(dbms_utility.format_error_backtrace());
        dbms_parallel_execute.drop_chunks(v_task);
        dbms_parallel_execute.drop_task(v_task);
END;

I am trying develop new multithread process for existed single thread project. And I try to use "DBMS_PARALLEL_EXECUTE" package. But problem is that, when I create for example 5 chunks, during execution chunks waits for each other. If one chunk works another one waits. I decided this, after several tests. I would like you help me to fix my code and execute all chunks at the same time parallely. I do not want they waits each other. On attachment you can see that, all chunks seperately works but main job's total time (1696.2) = all chunks working time. Due to this reason I think all chunks waits each other. You can see calculation on screen 2 Screen 1 enter image description here

Screen 2

enter image description here


Solution

  • dbms_parallel_execute doesn't sound like the right tool.

    Instead, you should consider simply passing in two parameters, thread_no, and thread_count, which are used in a MOD function on your base/outermost cursor. Much easier to understand and manage. Concept (not using your objects, just conveying the notion):

    CREATE OR REPLACE PROCEDURE p_dummy (thread_no, thread_count)
    AS
    BEGIN
      FOR rec_account IN (SELECT *
                            FROM accounts
                           WHERE MOD(cust_acc,thread_count) = thread_no - 1)
      LOOP
        --process
      END LOOP;
    END;
    

    Then kick off the procedure in an async loop from your client. Or if you have to start with a single PL/SQL point of entry, use dbms_scheduler to create one-off jobs for each of the procedure calls, changing the thread_no parameter with each one. Add a status/logging table with a thread_no column that the procedure populates with its end-state and have your originating loop sleep and poll until the jobs are done and check the table to get status.

    Lastly, make sure you've done the appropriate SQL tuning to ensure that your slow job isn't due to some SQL that needs some help. Rarely does PL/SQL CPU work constitute the majority of runtime. IF it turns out you have a bad SQL bogging you down, fixing that could remove the need to multithread this entirely.