Search code examples
databasepostgresqlmultithreadingperlclone

How to create multiple parallel database connections in multi-threaded Perl with DBD::Pg and Parallel::ForkManager?


EDITED 2021-07-21 BELOW

I have a script in which I connect to a PostgreSQL DB and also spawn multiple threads using Parallel::ForkManager.

I create a database handle and then prepare both a SELECT SQL statement and an INSERT SQL statement. I want to be able to run the SQL statements from each thread spawned by Parallel::ForkManager, but it fails because I can't share the DB handle between threads.

I need to take the script below (which fails IF I spawn more than one thread, but works if I only spawn a single thread), and change it such that each thread can read/write from/to the DB respectively.

I know I can clone a database handle, but I also know there is more to it.

How can I have parallel DB handles/SQL statements?

I apologize for the length of this, but I need to give as complete an example as possible.

Example:

use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use SQL::Abstract;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;

#@codes = ("A");                 # testing single thread
@codes = ("A","B","F","M","S"); # testing multi-thread

################################################################
# connect to db
################################################################
my $dsn    = 'DBI:Pg:dbname=DB_NAME';
my $userid = "DB_USERNAME";
my $sesame = "DB_PASSWORD";
my $dbh    = DBI->connect($dsn, $userid, $sesame, {
    AutoCommit => 1,
    RaiseError => 1,
    PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;

################################################################
# test db connection
################################################################
my $me = $dbh->{Driver}{Name};
my $sversion = $dbh->{pg_server_version};
print "DBI is version $DBI::VERSION, "
    . "I am $me, "
    . "version of DBD::Pg is $DBD::Pg::VERSION, "
    . "server is $sversion\n";
print "Name: $dbh->{Name}\n";

my %columns;   # hash for persistent mapping of column-values
my @columns;   # deterministic - $values[$columns{$column}];
set_columns(); # define column<->value mapping for db table

my $placeholders = join(", ", map { '?' } @columns);

################################################################
# Build the SELECT SQL statement
################################################################
my $sql_select_statement =
    qq(SELECT count(*) FROM mytable WHERE id = ?;);
# prepare the SELECT statement handle
my $sth_select_statement = $dbh->prepare_cached($sql_select_statement);

################################################################
# Build the INSERT SQL statement
################################################################
my $sql_insert_statement =
      "INSERT INTO mytable ("
    . join(", ", @columns)       # column names
    . ") VALUES ($placeholders)";
# prepare the INSERT statement handle
my $sth_insert_statement = $dbh->prepare_cached($sql_insert_statement);

################################################################
# create Parallel::ForkManager object for @codes
################################################################
my $optimization = Parallel::ForkManager->new(scalar @codes);

$optimization->run_on_start(sub{
    my ($pid,$ident) = @_;
    print "Starting $ident under process id $pid\n";
});

$optimization->run_on_finish(sub{
    my ($pid,
        $exit_code,
        $ident,
        $exit_signal,
        $core_dump,
        $data_structure_reference) = @_;
});

my $thread_count = 0;

OPTIMIZATION:
for my $code (@codes) {
    $thread_count++;
    print "Thread $thread_count running for $code\n";

    # fork optimization threads - per code
    $optimization->start($code) and next OPTIMIZATION;

    if ($code =~ m/A/i) {
        sub_a("A");
    } elsif ($code =~ m/B/i) {
        sub_b("B");
    } elsif ($code =~ m/F/i) {
        sub_f("F");
    } elsif ($code =~ m/M/i) {
        sub_m("M");
    } elsif ($code =~ m/S/i) {
        sub_s("S");
    }
    print "\$optimization->finish on child $code\n";
    $optimization->finish(0);
}
print "\$optimization->wait_all_children() is waiting...\n";
$optimization->wait_all_children();

################################################################
sub sub_a {
################################################################
    my $code = shift; # Code
    my @values;
    
    # generate values specific to A
    $varset{field1} = 'a_f1'; # for illustrative purposes
    $varset{field2} = 'a_f2';
    $varset{field3} = 'a_f3';
    $varset{field4} = 'a_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_b {
################################################################
    my $code = shift; # Code
    my @values;
    
    # generate values specific to B
    $varset{field1} = 'b_f1'; # for illustrative purposes
    $varset{field2} = 'b_f2';
    $varset{field3} = 'b_f3';
    $varset{field4} = 'b_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_f {
################################################################
    my $code = shift; # Code
    my @values;
    
    # generate values specific to F
    $varset{field1} = 'f_f1'; # for illustrative purposes
    $varset{field2} = 'f_f2';
    $varset{field3} = 'f_f3';
    $varset{field4} = 'f_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_m {
################################################################
    my $code = shift; # Code
    my @values;
    
    # generate values specific to M
    $varset{field1} = 'm_f1'; # for illustrative purposes
    $varset{field2} = 'm_f2';
    $varset{field3} = 'm_f3';
    $varset{field4} = 'm_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_s {
################################################################
    my $code = shift; # Code
    my @values;
    
    # generate values specific to S
    $varset{field1} = 's_f1'; # for illustrative purposes
    $varset{field2} = 's_f2';
    $varset{field3} = 's_f3';
    $varset{field4} = 's_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub write_to_db {
################################################################
    my ($code_name, @values) = @_;
    
    my $rv_code = $sth_select_statement->execute($code_name);
    if($rv_code < 0) {
       print $DBI::errstr;
    }
    my @row = $sth_select_statement->fetchrow_array();
    # if the SELECT found no existing records for this strategy, then INSERT it
    unless ($row[0] > 0) {
        # INSERT settings into 'mytable'
        $sth_insert_statement->execute(@values);
    }
}

################################################################
sub set_columns {
################################################################
    $columns{code}   = 0;
    $columns{field1} = 1;
    $columns{field2} = 2;
    $columns{field3} = 3;
    $columns{field4} = 4;
    
    $columns[0] = 'code';
    $columns[1] = 'field1';
    $columns[2] = 'field2';
    $columns[3] = 'field3';
    $columns[4] = 'field4';
}

EDIT 2021-07-21:

I have added clone_dbh and create_dbh sub-routines to try different methods. Neither work. I am still getting the same error - can't share DBH between threads. I don't know what else to do other than give up on sub-routines and write duplicate code for each thread spawned by Parallel::ForkManager, which I really do not want to do.

Example 2:

use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use SQL::Abstract;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;

#@codes = ("A");                 # testing single thread
my @codes = ("A","B","F","M","S"); # testing multi-thread
my %varset;

################################################################
# connect to db
################################################################
my $dsn    = 'DBI:Pg:dbname=$ENV{DB_NAME}';
my $userid = $ENV{DBI_USER};
my $sesame = $ENV{DBI_PASS};

my %dbh;   # hash for storing dbh handles

my $dbh    = DBI->connect($dsn, $userid, $sesame, {
    AutoCommit => 1,
    RaiseError => 1,
    PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;

################################################################
# test db connection
################################################################
my $me = $dbh->{Driver}{Name};
my $sversion = $dbh->{pg_server_version};
print "DBI is version $DBI::VERSION, "
    . "I am $me, "
    . "version of DBD::Pg is $DBD::Pg::VERSION, "
    . "server is $sversion\n";
print "Name: $dbh->{Name}\n";

################################################################
# prepare array and hash for matching db columns
################################################################
my %columns;   # hash for persistent mapping of column-values
my @columns;   # deterministic - $values[$columns{$column}];
set_columns(); # define column<->value mapping for db table

my $placeholders = join(", ", map { '?' } @columns);

################################################################
# Build the SELECT SQL statement
################################################################
my $sql_select_statement =
    qq(SELECT count(*) FROM mytable WHERE id = ?;);

################################################################
# Build the INSERT SQL statement
################################################################
my $sql_insert_statement =
      "INSERT INTO mytable ("
    . join(", ", @columns)       # column names
    . ") VALUES ($placeholders)";

################################################################################################
# create clones of database handle and SQL statements for threads
################################################################################################
my %sth_select_code;
my %sth_insert_code;

#for my $code (@codes) {
#    $dbh{$code} = $dbh->clone();
#    # prepare the SELECT statement handle
#    $sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_statement);
#    # prepare the INSERT statement handle
#    $sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_statement);
#}

################################################################
# create Parallel::ForkManager object for @codes
################################################################
my $optimization = Parallel::ForkManager->new(scalar @codes);

$optimization->run_on_start(sub{
    my ($pid,$ident) = @_;
    print "Starting $ident under process id $pid\n";
});

$optimization->run_on_finish(sub{
    my ($pid,
        $exit_code,
        $ident,
        $exit_signal,
        $core_dump,
        $data_structure_reference) = @_;
});

my $thread_count = 0;

OPTIMIZATION:
for my $code (@codes) {
    $thread_count++;
    print "Thread $thread_count running for $code\n";

    # fork optimization threads - per code
    $optimization->start($code) and next OPTIMIZATION;

    if ($code =~ m/A/i) {
        sub_a("A");
    } elsif ($code =~ m/B/i) {
        sub_b("B");
    } elsif ($code =~ m/F/i) {
        sub_f("F");
    } elsif ($code =~ m/M/i) {
        sub_m("M");
    } elsif ($code =~ m/S/i) {
        sub_s("S");
    }
    print "\$optimization->finish on child $code\n";
    $optimization->finish(0);
}
print "\$optimization->wait_all_children() is waiting...\n";
$optimization->wait_all_children();

################################################################################################
# disconnect from database
################################################################################################
for my $code (@codes) {
    $sth_select_code{$code}->finish();
    $sth_insert_code{$code}->finish();
    $dbh{$code}->disconnect;
}
$dbh->disconnect;

################################################################################################
# end
################################################################################################
exit;

################################################################
sub sub_a {
################################################################
    my $code = shift; # Code
    my @values;
#    clone_dbh($code);
    create_dbh($code);
#    my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
    
    # generate values specific to A
    $varset{field2} = 'a_f1'; # for illustrative purposes
    $varset{field3} = 'a_f2';
    $varset{field4} = 'a_f3';
    $varset{field5} = 'a_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_b {
################################################################
    my $code = shift; # Code
    my @values;
#    clone_dbh($code);
    create_dbh($code);
#    my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
    
    # generate values specific to B
    $varset{field2} = 'b_f1'; # for illustrative purposes
    $varset{field3} = 'b_f2';
    $varset{field4} = 'b_f3';
    $varset{field5} = 'b_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_f {
################################################################
    my $code = shift; # Code
    my @values;
#    clone_dbh($code);
    create_dbh($code);
#    my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
    
    # generate values specific to F
    $varset{field2} = 'f_f1'; # for illustrative purposes
    $varset{field3} = 'f_f2';
    $varset{field4} = 'f_f3';
    $varset{field5} = 'f_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_m {
################################################################
    my $code = shift; # Code
    my @values;
#    clone_dbh($code);
    create_dbh($code);
#    my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
    
    # generate values specific to M
    $varset{field2} = 'm_f1'; # for illustrative purposes
    $varset{field3} = 'm_f2';
    $varset{field4} = 'm_f3';
    $varset{field5} = 'm_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_s {
################################################################
    my $code = shift; # Code
    my @values;
#    clone_dbh($code);
    create_dbh($code);
#    my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
    
    # generate values specific to S
    $varset{field2} = 's_f1'; # for illustrative purposes
    $varset{field3} = 's_f2';
    $varset{field4} = 's_f3';
    $varset{field5} = 's_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################################
sub create_dbh {
################################################################################
    my $code = shift;
    
    $dbh{$code} = DBI->connect($dsn, $userid, $sesame, {
        AutoCommit => 1,
        RaiseError => 1,
        PrintError => 1
    }) or die "Connection failed!\n" . $DBI::errstr;
    
    # did it work? are we there yet?
    my $me = $dbh{$code}->{Driver}{Name};
    my $sversion = $dbh{$code}->{pg_server_version};
    print "DBI is version $DBI::VERSION, "
        . "I am $me, "
        . "version of DBD::Pg is $DBD::Pg::VERSION, "
        . "server is $sversion\n";
    print "Name: $dbh->{Name}\n";
    # prepare the SELECT statement handle
    $sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_statement);
    # prepare the INSERT statement handle
    $sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_statement);
}

################################################################
sub clone_dbh {
################################################################
    my $code = shift;
    
    $dbh{$code} = $dbh->clone();
    # prepare the SELECT statement handle
    $sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_statement);
    # prepare the INSERT statement handle
    $sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_statement);
}

################################################################
sub write_to_db {
################################################################
    my ($code, @values) = shift @_;
    
    my $rv_code = $sth_select_code{$code}->execute($code);
    if($rv_code < 0) {
       print $DBI::errstr;
    }
    my @row = $sth_select_code{$code}->fetchrow_array();
    # if the SELECT found no existing records for this strategy, then INSERT it
    unless ($row[0] > 0) {
        # INSERT settings into 'mytable'
        $sth_insert_code{$code}->execute(@values);
    }
}

################################################################
sub set_columns {
################################################################
    $columns{field1} = 0;
    $columns{field2} = 1;
    $columns{field3} = 2;
    $columns{field4} = 3;
    $columns{field5} = 4;
    
    $columns[0] = 'field1';
    $columns[1] = 'field2';
    $columns[2] = 'field3';
    $columns[3] = 'field4';
    $columns[4] = 'field5';
}

Do I have to pass dbh and statement handles to the create_dbh sub explicitly from within the calling sub_x thread?

Do I have to return the handles or handle_refs from create_dbh back to calling sub_x thread?

I don't know how to get around this, but it seems like a lexical scope or object/memory access issue.

Any more ideas?


Solution

  • I got it working using the code below:

    use 5.24.0;
    use strict;
    use warnings;
    use Parallel::ForkManager;
    use SQL::Abstract;
    use DBI ':sql_types';
    use DBD::Pg qw/:pg_types/;
    
    #my @codes = ("A");                 # testing single thread
    my @codes = ("A","B","F","M","S"); # testing multi-thread
    my %varset;
    
    ################################################################
    # get db connection info
    ################################################################
    my $dsn    = 'DBI:Pg:dbname=mt4_test';
    my $userid = $ENV{DBI_USER};
    my $sesame = $ENV{DBI_PASS};
    my %dbh; # hash for storing dbh handles
    
    ################################################################
    # prepare array and hash for matching db columns
    ################################################################
    my %columns;   # hash for persistent mapping of column-values
    my @columns;   # deterministic - $values[$columns{$column}];
    set_columns(); # define column<->value mapping for db table
    
    my $placeholders = join(", ", map { '?' } @columns);
    
    ################################################################
    # Build the SELECT SQL statement
    ################################################################
    my $sql_select_statement =
        qq(SELECT count(*) FROM mytable WHERE field1 = ?;);
    
    ################################################################
    # Build the INSERT SQL statement
    ################################################################
    my $sql_insert_statement =
          "INSERT INTO mytable ("
        . join(", ", @columns)        # column names
        . ") VALUES ($placeholders)";
    
    ################################################################
    # hash for storing SQL statement handles for threads
    ################################################################
    my %sth_select_code;
    my %sth_insert_code;
    
    ################################################################
    # create Parallel::ForkManager object for @codes
    ################################################################
    my $optimization = Parallel::ForkManager->new(scalar @codes);
    
    $optimization->run_on_start(sub{
        my ($pid,$ident) = @_;
        print "Starting $ident under process id $pid\n";
    });
    
    $optimization->run_on_finish(sub{
        my ($pid,
            $exit_code,
            $ident,
            $exit_signal,
            $core_dump,
            $data_structure_reference) = @_;
    });
    
    my $thread_count = 0;
    
    OPTIMIZATION:
    for my $code (@codes) {
        $thread_count++;
        print "Thread $thread_count running for $code\n";
    
        # fork optimization threads - per code
        if (scalar @codes > 1) {
            $optimization->start($code) and next OPTIMIZATION;
        } else {
            $optimization->start($code);
        }
        
        launch_sub($code);
        
        print "\$optimization->finish on child $code\n";
        $optimization->finish(0);
    }
    print "\$optimization->wait_all_children() is waiting...\n";
    $optimization->wait_all_children();
    
    ################################################################
    # THE END
    ################################################################
    exit;
    ################################################################
    
    
    
    ################################################################
    sub launch_sub {
    ################################################################
        my $code = shift;
        if ($code =~ m/A/i) {
            sub_a("A");
        } elsif ($code =~ m/B/i) {
            sub_b("B");
        } elsif ($code =~ m/F/i) {
            sub_f("F");
        } elsif ($code =~ m/M/i) {
            sub_m("M");
        } elsif ($code =~ m/S/i) {
            sub_s("S");
        }
    }
    ################################################################
    sub sub_a {
    ################################################################
        my $code = shift; # Code
        my @values;
        my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
        
        # generate values specific to A
        $varset{field2} = 'a_f1'; # for illustrative purposes
        $varset{field3} = 'a_f2';
        $varset{field4} = 'a_f3';
        $varset{field5} = 'a_f4';
        
        foreach my $key (keys %varset) {
            my $column = $key; # the column name as key
            my $value  = $varset{$key}; # the column value (field variable)
            $values[$columns{$column}] = $value; # add to list of column values
        }
        $values[0] = $code;
        write_to_db($code, @values);
        disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
    }
    
    ################################################################
    sub sub_b {
    ################################################################
        my $code = shift; # Code
        my @values;
        my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
        
        # generate values specific to B
        $varset{field2} = 'b_f1'; # for illustrative purposes
        $varset{field3} = 'b_f2';
        $varset{field4} = 'b_f3';
        $varset{field5} = 'b_f4';
        
        foreach my $key (keys %varset) {
            my $column = $key; # the column name as key
            my $value  = $varset{$key}; # the column value (field variable)
            $values[$columns{$column}] = $value; # add to list of column values
        }
        $values[0] = $code;
        write_to_db($code, @values);
        disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
    }
    
    ################################################################
    sub sub_f {
    ################################################################
        my $code = shift; # Code
        my @values;
        my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
        
        # generate values specific to F
        $varset{field2} = 'f_f1'; # for illustrative purposes
        $varset{field3} = 'f_f2';
        $varset{field4} = 'f_f3';
        $varset{field5} = 'f_f4';
        
        foreach my $key (keys %varset) {
            my $column = $key; # the column name as key
            my $value  = $varset{$key}; # the column value (field variable)
            $values[$columns{$column}] = $value; # add to list of column values
        }
        $values[0] = $code;
        write_to_db($code, @values);
        disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
    }
    
    ################################################################
    sub sub_m {
    ################################################################
        my $code = shift; # Code
        my @values;
        my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
        
        # generate values specific to M
        $varset{field2} = 'm_f1'; # for illustrative purposes
        $varset{field3} = 'm_f2';
        $varset{field4} = 'm_f3';
        $varset{field5} = 'm_f4';
        
        foreach my $key (keys %varset) {
            my $column = $key; # the column name as key
            my $value  = $varset{$key}; # the column value (field variable)
            $values[$columns{$column}] = $value; # add to list of column values
        }
        $values[0] = $code;
        write_to_db($code, @values);
        disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
    }
    
    ################################################################
    sub sub_s {
    ################################################################
        my $code = shift; # Code
        my @values;
        my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
        
        # generate values specific to S
        $varset{field2} = 's_f1'; # for illustrative purposes
        $varset{field3} = 's_f2';
        $varset{field4} = 's_f3';
        $varset{field5} = 's_f4';
        
        foreach my $key (keys %varset) {
            my $column = $key; # the column name as key
            my $value  = $varset{$key}; # the column value (field variable)
            $values[$columns{$column}] = $value; # add to list of column values
        }
        $values[0] = $code;
        write_to_db($code, @values);
        disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
    }
    
    ################################################################
    sub create_dbh {
    ################################################################
        my $dbh_ref = shift;
        my $sel_ref = shift;
        my $ins_ref = shift;
        
        ${$dbh_ref} = DBI->connect($dsn, $userid, $sesame, {
            AutoCommit => 1,
            RaiseError => 1,
            PrintError => 1
        }) or die "Connection failed!\n" . $DBI::errstr;
        
        # did it work? are we there yet?
        my $me = ${$dbh_ref}->{Driver}{Name};
        my $sversion = ${$dbh_ref}->{pg_server_version};
        print "DBI is version $DBI::VERSION, "
            . "I am $me, "
            . "version of DBD::Pg is $DBD::Pg::VERSION, "
            . "server is $sversion\n";
        print "Name: ${$dbh_ref}->{Name}\n";
        # prepare the SELECT statement handle
        ${$sel_ref} = ${$dbh_ref}->prepare_cached($sql_select_statement);
        # prepare the INSERT statement handle
        ${$ins_ref} = ${$dbh_ref}->prepare_cached($sql_insert_statement);
    }
    
    ################################################################
    sub write_to_db {
    ################################################################
        my ($code, @values) = @_;
        
        my $rv_code = $sth_select_code{$code}->execute($code);
        say "SQL SELECT for $code: rv_code = $rv_code";
        if($rv_code < 0) {
           print $DBI::errstr;
        }
        my @row = $sth_select_code{$code}->fetchrow_array();
        # if the SELECT found no existing records for this strategy, then INSERT it
        unless ($row[0] > 0) {
            # INSERT settings into 'mytable'
            $sth_insert_code{$code}->execute(@values);
            say "SQL INSERT for $code";
        }
    }
    
    ################################################################
    sub disconnect_dbh {
    ################################################################
        my $dbh_ref = shift;
        my $sel_ref = shift;
        my $ins_ref = shift;
        
        ${$sel_ref}->finish();
        ${$ins_ref}->finish();
        ${$dbh_ref}->disconnect;
        
        say "disconnected dbh_ref: $dbh_ref";
    }
    
    ################################################################
    sub set_columns {
    ################################################################
        $columns{field1} = 0;
        $columns{field2} = 1;
        $columns{field3} = 2;
        $columns{field4} = 3;
        $columns{field5} = 4;
        
        $columns[0] = 'field1';
        $columns[1] = 'field2';
        $columns[2] = 'field3';
        $columns[3] = 'field4';
        $columns[4] = 'field5';
    }