EDITED 2021-07-21 BELOW
I have a script in which I connect to a PostgreSQL DB and also spawn multiple threads using Parallel::ForkManager
.
I create a database handle and then prepare both a SELECT
SQL statement and an INSERT
SQL statement.
I want to be able to run the SQL statements from each thread spawned by Parallel::ForkManager
, but it fails because I can't share the DB handle between threads.
I need to take the script below (which fails IF I spawn more than one thread, but works if I only spawn a single thread), and change it such that each thread can read/write from/to the DB respectively.
I know I can clone
a database handle, but I also know there is more to it.
How can I have parallel DB handles/SQL statements?
I apologize for the length of this, but I need to give as complete an example as possible.
Example:
use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use SQL::Abstract;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;
#@codes = ("A"); # testing single thread
@codes = ("A","B","F","M","S"); # testing multi-thread
################################################################
# connect to db
################################################################
my $dsn = 'DBI:Pg:dbname=DB_NAME';
my $userid = "DB_USERNAME";
my $sesame = "DB_PASSWORD";
my $dbh = DBI->connect($dsn, $userid, $sesame, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;
################################################################
# test db connection
################################################################
my $me = $dbh->{Driver}{Name};
my $sversion = $dbh->{pg_server_version};
print "DBI is version $DBI::VERSION, "
. "I am $me, "
. "version of DBD::Pg is $DBD::Pg::VERSION, "
. "server is $sversion\n";
print "Name: $dbh->{Name}\n";
my %columns; # hash for persistent mapping of column-values
my @columns; # deterministic - $values[$columns{$column}];
set_columns(); # define column<->value mapping for db table
my $placeholders = join(", ", map { '?' } @columns);
################################################################
# Build the SELECT SQL statement
################################################################
my $sql_select_statement =
qq(SELECT count(*) FROM mytable WHERE id = ?;);
# prepare the SELECT statement handle
my $sth_select_statement = $dbh->prepare_cached($sql_select_statement);
################################################################
# Build the INSERT SQL statement
################################################################
my $sql_insert_statement =
"INSERT INTO mytable ("
. join(", ", @columns) # column names
. ") VALUES ($placeholders)";
# prepare the INSERT statement handle
my $sth_insert_statement = $dbh->prepare_cached($sql_insert_statement);
################################################################
# create Parallel::ForkManager object for @codes
################################################################
my $optimization = Parallel::ForkManager->new(scalar @codes);
$optimization->run_on_start(sub{
my ($pid,$ident) = @_;
print "Starting $ident under process id $pid\n";
});
$optimization->run_on_finish(sub{
my ($pid,
$exit_code,
$ident,
$exit_signal,
$core_dump,
$data_structure_reference) = @_;
});
my $thread_count = 0;
OPTIMIZATION:
for my $code (@codes) {
$thread_count++;
print "Thread $thread_count running for $code\n";
# fork optimization threads - per code
$optimization->start($code) and next OPTIMIZATION;
if ($code =~ m/A/i) {
sub_a("A");
} elsif ($code =~ m/B/i) {
sub_b("B");
} elsif ($code =~ m/F/i) {
sub_f("F");
} elsif ($code =~ m/M/i) {
sub_m("M");
} elsif ($code =~ m/S/i) {
sub_s("S");
}
print "\$optimization->finish on child $code\n";
$optimization->finish(0);
}
print "\$optimization->wait_all_children() is waiting...\n";
$optimization->wait_all_children();
################################################################
sub sub_a {
################################################################
my $code = shift; # Code
my @values;
# generate values specific to A
$varset{field1} = 'a_f1'; # for illustrative purposes
$varset{field2} = 'a_f2';
$varset{field3} = 'a_f3';
$varset{field4} = 'a_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_b {
################################################################
my $code = shift; # Code
my @values;
# generate values specific to B
$varset{field1} = 'b_f1'; # for illustrative purposes
$varset{field2} = 'b_f2';
$varset{field3} = 'b_f3';
$varset{field4} = 'b_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_f {
################################################################
my $code = shift; # Code
my @values;
# generate values specific to F
$varset{field1} = 'f_f1'; # for illustrative purposes
$varset{field2} = 'f_f2';
$varset{field3} = 'f_f3';
$varset{field4} = 'f_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_m {
################################################################
my $code = shift; # Code
my @values;
# generate values specific to M
$varset{field1} = 'm_f1'; # for illustrative purposes
$varset{field2} = 'm_f2';
$varset{field3} = 'm_f3';
$varset{field4} = 'm_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_s {
################################################################
my $code = shift; # Code
my @values;
# generate values specific to S
$varset{field1} = 's_f1'; # for illustrative purposes
$varset{field2} = 's_f2';
$varset{field3} = 's_f3';
$varset{field4} = 's_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub write_to_db {
################################################################
my ($code_name, @values) = @_;
my $rv_code = $sth_select_statement->execute($code_name);
if($rv_code < 0) {
print $DBI::errstr;
}
my @row = $sth_select_statement->fetchrow_array();
# if the SELECT found no existing records for this strategy, then INSERT it
unless ($row[0] > 0) {
# INSERT settings into 'mytable'
$sth_insert_statement->execute(@values);
}
}
################################################################
sub set_columns {
################################################################
$columns{code} = 0;
$columns{field1} = 1;
$columns{field2} = 2;
$columns{field3} = 3;
$columns{field4} = 4;
$columns[0] = 'code';
$columns[1] = 'field1';
$columns[2] = 'field2';
$columns[3] = 'field3';
$columns[4] = 'field4';
}
EDIT 2021-07-21:
I have added clone_dbh
and create_dbh
sub-routines to try different methods. Neither work. I am still getting the same error - can't share DBH between threads
. I don't know what else to do other than give up on sub-routines and write duplicate code for each thread spawned by Parallel::ForkManager
, which I really do not want to do.
Example 2:
use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use SQL::Abstract;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;
#@codes = ("A"); # testing single thread
my @codes = ("A","B","F","M","S"); # testing multi-thread
my %varset;
################################################################
# connect to db
################################################################
my $dsn = 'DBI:Pg:dbname=$ENV{DB_NAME}';
my $userid = $ENV{DBI_USER};
my $sesame = $ENV{DBI_PASS};
my %dbh; # hash for storing dbh handles
my $dbh = DBI->connect($dsn, $userid, $sesame, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;
################################################################
# test db connection
################################################################
my $me = $dbh->{Driver}{Name};
my $sversion = $dbh->{pg_server_version};
print "DBI is version $DBI::VERSION, "
. "I am $me, "
. "version of DBD::Pg is $DBD::Pg::VERSION, "
. "server is $sversion\n";
print "Name: $dbh->{Name}\n";
################################################################
# prepare array and hash for matching db columns
################################################################
my %columns; # hash for persistent mapping of column-values
my @columns; # deterministic - $values[$columns{$column}];
set_columns(); # define column<->value mapping for db table
my $placeholders = join(", ", map { '?' } @columns);
################################################################
# Build the SELECT SQL statement
################################################################
my $sql_select_statement =
qq(SELECT count(*) FROM mytable WHERE id = ?;);
################################################################
# Build the INSERT SQL statement
################################################################
my $sql_insert_statement =
"INSERT INTO mytable ("
. join(", ", @columns) # column names
. ") VALUES ($placeholders)";
################################################################################################
# create clones of database handle and SQL statements for threads
################################################################################################
my %sth_select_code;
my %sth_insert_code;
#for my $code (@codes) {
# $dbh{$code} = $dbh->clone();
# # prepare the SELECT statement handle
# $sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_statement);
# # prepare the INSERT statement handle
# $sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_statement);
#}
################################################################
# create Parallel::ForkManager object for @codes
################################################################
my $optimization = Parallel::ForkManager->new(scalar @codes);
$optimization->run_on_start(sub{
my ($pid,$ident) = @_;
print "Starting $ident under process id $pid\n";
});
$optimization->run_on_finish(sub{
my ($pid,
$exit_code,
$ident,
$exit_signal,
$core_dump,
$data_structure_reference) = @_;
});
my $thread_count = 0;
OPTIMIZATION:
for my $code (@codes) {
$thread_count++;
print "Thread $thread_count running for $code\n";
# fork optimization threads - per code
$optimization->start($code) and next OPTIMIZATION;
if ($code =~ m/A/i) {
sub_a("A");
} elsif ($code =~ m/B/i) {
sub_b("B");
} elsif ($code =~ m/F/i) {
sub_f("F");
} elsif ($code =~ m/M/i) {
sub_m("M");
} elsif ($code =~ m/S/i) {
sub_s("S");
}
print "\$optimization->finish on child $code\n";
$optimization->finish(0);
}
print "\$optimization->wait_all_children() is waiting...\n";
$optimization->wait_all_children();
################################################################################################
# disconnect from database
################################################################################################
for my $code (@codes) {
$sth_select_code{$code}->finish();
$sth_insert_code{$code}->finish();
$dbh{$code}->disconnect;
}
$dbh->disconnect;
################################################################################################
# end
################################################################################################
exit;
################################################################
sub sub_a {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to A
$varset{field2} = 'a_f1'; # for illustrative purposes
$varset{field3} = 'a_f2';
$varset{field4} = 'a_f3';
$varset{field5} = 'a_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_b {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to B
$varset{field2} = 'b_f1'; # for illustrative purposes
$varset{field3} = 'b_f2';
$varset{field4} = 'b_f3';
$varset{field5} = 'b_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_f {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to F
$varset{field2} = 'f_f1'; # for illustrative purposes
$varset{field3} = 'f_f2';
$varset{field4} = 'f_f3';
$varset{field5} = 'f_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_m {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to M
$varset{field2} = 'm_f1'; # for illustrative purposes
$varset{field3} = 'm_f2';
$varset{field4} = 'm_f3';
$varset{field5} = 'm_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################
sub sub_s {
################################################################
my $code = shift; # Code
my @values;
# clone_dbh($code);
create_dbh($code);
# my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
# generate values specific to S
$varset{field2} = 's_f1'; # for illustrative purposes
$varset{field3} = 's_f2';
$varset{field4} = 's_f3';
$varset{field5} = 's_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
}
################################################################################
sub create_dbh {
################################################################################
my $code = shift;
$dbh{$code} = DBI->connect($dsn, $userid, $sesame, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;
# did it work? are we there yet?
my $me = $dbh{$code}->{Driver}{Name};
my $sversion = $dbh{$code}->{pg_server_version};
print "DBI is version $DBI::VERSION, "
. "I am $me, "
. "version of DBD::Pg is $DBD::Pg::VERSION, "
. "server is $sversion\n";
print "Name: $dbh->{Name}\n";
# prepare the SELECT statement handle
$sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_statement);
# prepare the INSERT statement handle
$sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_statement);
}
################################################################
sub clone_dbh {
################################################################
my $code = shift;
$dbh{$code} = $dbh->clone();
# prepare the SELECT statement handle
$sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_statement);
# prepare the INSERT statement handle
$sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_statement);
}
################################################################
sub write_to_db {
################################################################
my ($code, @values) = shift @_;
my $rv_code = $sth_select_code{$code}->execute($code);
if($rv_code < 0) {
print $DBI::errstr;
}
my @row = $sth_select_code{$code}->fetchrow_array();
# if the SELECT found no existing records for this strategy, then INSERT it
unless ($row[0] > 0) {
# INSERT settings into 'mytable'
$sth_insert_code{$code}->execute(@values);
}
}
################################################################
sub set_columns {
################################################################
$columns{field1} = 0;
$columns{field2} = 1;
$columns{field3} = 2;
$columns{field4} = 3;
$columns{field5} = 4;
$columns[0] = 'field1';
$columns[1] = 'field2';
$columns[2] = 'field3';
$columns[3] = 'field4';
$columns[4] = 'field5';
}
Do I have to pass dbh
and statement
handles to the create_dbh
sub explicitly from within the calling sub_x
thread?
Do I have to return the handles
or handle_refs
from create_dbh
back to calling sub_x
thread?
I don't know how to get around this, but it seems like a lexical scope or object/memory access issue.
Any more ideas?
I got it working using the code below:
use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use SQL::Abstract;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;
#my @codes = ("A"); # testing single thread
my @codes = ("A","B","F","M","S"); # testing multi-thread
my %varset;
################################################################
# get db connection info
################################################################
my $dsn = 'DBI:Pg:dbname=mt4_test';
my $userid = $ENV{DBI_USER};
my $sesame = $ENV{DBI_PASS};
my %dbh; # hash for storing dbh handles
################################################################
# prepare array and hash for matching db columns
################################################################
my %columns; # hash for persistent mapping of column-values
my @columns; # deterministic - $values[$columns{$column}];
set_columns(); # define column<->value mapping for db table
my $placeholders = join(", ", map { '?' } @columns);
################################################################
# Build the SELECT SQL statement
################################################################
my $sql_select_statement =
qq(SELECT count(*) FROM mytable WHERE field1 = ?;);
################################################################
# Build the INSERT SQL statement
################################################################
my $sql_insert_statement =
"INSERT INTO mytable ("
. join(", ", @columns) # column names
. ") VALUES ($placeholders)";
################################################################
# hash for storing SQL statement handles for threads
################################################################
my %sth_select_code;
my %sth_insert_code;
################################################################
# create Parallel::ForkManager object for @codes
################################################################
my $optimization = Parallel::ForkManager->new(scalar @codes);
$optimization->run_on_start(sub{
my ($pid,$ident) = @_;
print "Starting $ident under process id $pid\n";
});
$optimization->run_on_finish(sub{
my ($pid,
$exit_code,
$ident,
$exit_signal,
$core_dump,
$data_structure_reference) = @_;
});
my $thread_count = 0;
OPTIMIZATION:
for my $code (@codes) {
$thread_count++;
print "Thread $thread_count running for $code\n";
# fork optimization threads - per code
if (scalar @codes > 1) {
$optimization->start($code) and next OPTIMIZATION;
} else {
$optimization->start($code);
}
launch_sub($code);
print "\$optimization->finish on child $code\n";
$optimization->finish(0);
}
print "\$optimization->wait_all_children() is waiting...\n";
$optimization->wait_all_children();
################################################################
# THE END
################################################################
exit;
################################################################
################################################################
sub launch_sub {
################################################################
my $code = shift;
if ($code =~ m/A/i) {
sub_a("A");
} elsif ($code =~ m/B/i) {
sub_b("B");
} elsif ($code =~ m/F/i) {
sub_f("F");
} elsif ($code =~ m/M/i) {
sub_m("M");
} elsif ($code =~ m/S/i) {
sub_s("S");
}
}
################################################################
sub sub_a {
################################################################
my $code = shift; # Code
my @values;
my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
# generate values specific to A
$varset{field2} = 'a_f1'; # for illustrative purposes
$varset{field3} = 'a_f2';
$varset{field4} = 'a_f3';
$varset{field5} = 'a_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
}
################################################################
sub sub_b {
################################################################
my $code = shift; # Code
my @values;
my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
# generate values specific to B
$varset{field2} = 'b_f1'; # for illustrative purposes
$varset{field3} = 'b_f2';
$varset{field4} = 'b_f3';
$varset{field5} = 'b_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
}
################################################################
sub sub_f {
################################################################
my $code = shift; # Code
my @values;
my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
# generate values specific to F
$varset{field2} = 'f_f1'; # for illustrative purposes
$varset{field3} = 'f_f2';
$varset{field4} = 'f_f3';
$varset{field5} = 'f_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
}
################################################################
sub sub_m {
################################################################
my $code = shift; # Code
my @values;
my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
# generate values specific to M
$varset{field2} = 'm_f1'; # for illustrative purposes
$varset{field3} = 'm_f2';
$varset{field4} = 'm_f3';
$varset{field5} = 'm_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
}
################################################################
sub sub_s {
################################################################
my $code = shift; # Code
my @values;
my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
# generate values specific to S
$varset{field2} = 's_f1'; # for illustrative purposes
$varset{field3} = 's_f2';
$varset{field4} = 's_f3';
$varset{field5} = 's_f4';
foreach my $key (keys %varset) {
my $column = $key; # the column name as key
my $value = $varset{$key}; # the column value (field variable)
$values[$columns{$column}] = $value; # add to list of column values
}
$values[0] = $code;
write_to_db($code, @values);
disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
}
################################################################
sub create_dbh {
################################################################
my $dbh_ref = shift;
my $sel_ref = shift;
my $ins_ref = shift;
${$dbh_ref} = DBI->connect($dsn, $userid, $sesame, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;
# did it work? are we there yet?
my $me = ${$dbh_ref}->{Driver}{Name};
my $sversion = ${$dbh_ref}->{pg_server_version};
print "DBI is version $DBI::VERSION, "
. "I am $me, "
. "version of DBD::Pg is $DBD::Pg::VERSION, "
. "server is $sversion\n";
print "Name: ${$dbh_ref}->{Name}\n";
# prepare the SELECT statement handle
${$sel_ref} = ${$dbh_ref}->prepare_cached($sql_select_statement);
# prepare the INSERT statement handle
${$ins_ref} = ${$dbh_ref}->prepare_cached($sql_insert_statement);
}
################################################################
sub write_to_db {
################################################################
my ($code, @values) = @_;
my $rv_code = $sth_select_code{$code}->execute($code);
say "SQL SELECT for $code: rv_code = $rv_code";
if($rv_code < 0) {
print $DBI::errstr;
}
my @row = $sth_select_code{$code}->fetchrow_array();
# if the SELECT found no existing records for this strategy, then INSERT it
unless ($row[0] > 0) {
# INSERT settings into 'mytable'
$sth_insert_code{$code}->execute(@values);
say "SQL INSERT for $code";
}
}
################################################################
sub disconnect_dbh {
################################################################
my $dbh_ref = shift;
my $sel_ref = shift;
my $ins_ref = shift;
${$sel_ref}->finish();
${$ins_ref}->finish();
${$dbh_ref}->disconnect;
say "disconnected dbh_ref: $dbh_ref";
}
################################################################
sub set_columns {
################################################################
$columns{field1} = 0;
$columns{field2} = 1;
$columns{field3} = 2;
$columns{field4} = 3;
$columns{field5} = 4;
$columns[0] = 'field1';
$columns[1] = 'field2';
$columns[2] = 'field3';
$columns[3] = 'field4';
$columns[4] = 'field5';
}