#!/usr/bin/perl
#
# mysql-table-copy.pl - move data from one table in database to
# table in another database. Can be used for one
# table at once.
#
# Developed by Lubomir Host <rajo AT platon.sk>
# Copyright (c) 2003-2008 Platon SDG, http://platon.sk/
# Licensed under terms of GNU General Public License.
# All rights reserved.
#
# $Platon: scripts/perl/mysql/mysql-table-copy.pl,v 1.17 2008-02-03 23:57:44 rajo Exp $
# TODO: - reconnect on closed connection
# - managment for tables with multiple KEY columns
use strict;
use DBI;
use Time::HiRes qw(gettimeofday tv_interval);
use POSIX qw(strftime);
use Getopt::Long;
use vars qw (
$opt_exit $opt_pk $opt_nd $opt_where $opt_verbose $opt_update_sum
$dbh1 $db_name1 $user1 $pass1
$dbh2 $db_name2 $user2 $pass2
$table $timeout
$default_timeout
$primary_key @columns
@unique_keys
@update_columns
$sql_max_id $sql_read_data $sql_write_data $sql_delete_data $sql_update_data
);
sub cur_date();
sub read_input();
sub usage();
$default_timeout = 10; # 10 seconds
my $res = GetOptions(
'pk|p' => \$opt_pk,
'nd|n' => \$opt_nd,
'where|w=s' => \$opt_where,
'exit|e' => \$opt_exit,
'verbose|v' => \$opt_verbose,
'timeout|t=i' => \$timeout,
'update-sum' => \$opt_update_sum,
);
#use Data::Dumper;
#die Dumper({
# 'pk' => $opt_pk,
# 'nd' => $opt_nd,
# 'where|w=s' => $opt_where,
# 'exit' => $opt_exit,
# 'verbose|v' => $opt_verbose,
# 'timeout|t=i' => $timeout,
# 'update-sum' => $opt_update_sum,
#});
#die Dumper(\@ARGV);
usage() unless (scalar(@ARGV));
($db_name1, $db_name2, $table, $user1, $pass1, $user2, $pass2) = @ARGV;
usage() if (! defined $db_name1 or ! defined $db_name2);
# we are using ',' as separators, because ';' is separator in shell
$db_name1 =~ s/,/;/g;
$db_name2 =~ s/,/;/g;
$db_name1 =~ m/^DBI:/ or $db_name1 = "DBI:mysql:$db_name1";
$db_name2 =~ m/^DBI:/ or $db_name2 = "DBI:mysql:$db_name2";
if (!defined $timeout or $timeout !~ m/^[0-9]+$/) {
print STDERR "Using default timeout $default_timeout seconds\n" if ($opt_verbose);
$timeout = $default_timeout;
}
# user, password and connect {{{
print "Enter username for database $db_name1:\n" and $user1 = read_input() unless defined $user1;
print "Enter password for user $user1:\n" and $pass1 = read_input() unless defined $pass1;
print "Enter username for database $db_name2:\n" and $user2 = read_input() unless defined $user2;
print "Enter password for user $user2:\n" and $pass2 = read_input() unless defined $pass2;
$dbh1 = DBI->connect($db_name1, $user1, $pass1) or die $DBI::errstr;
$dbh2 = DBI->connect($db_name2, $user2, $pass2) or die $DBI::errstr;
$dbh1->{mysql_auto_reconnect} = 1;
$dbh2->{mysql_auto_reconnect} = 1;
$dbh1->{AutoCommit} = 1;
$dbh2->{AutoCommit} = 1;
#
# Please, optimize me!
#
if ($db_name1 =~ m/^DBI:Oracle:/) {
my $ignored = $dbh1->do('ALTER SESSION SET NLS_DATE_FORMAT = \'YYYY-MM-DD HH24:MI:SS\'')
or die "DBI::do(): $DBI::errstr";
}
if ($db_name2 =~ m/^DBI:Oracle:/) {
my $ignored = $dbh2->do('ALTER SESSION SET NLS_DATE_FORMAT = \'YYYY-MM-DD HH24:MI:SS\'')
or die "DBI::do(): $DBI::errstr";
}
# }}}
# look for PRIMARY KEY and other columns
my $sql_info = $dbh1->prepare("DESCRIBE " . $table);
$sql_info->execute();
my $info= $sql_info->fetchall_hashref('Field');
foreach my $key (sort keys %$info) {
if ($$info{$key}->{Key} eq 'PRI') {
print "# Primary key is '$key'\n" if ($opt_verbose);
$primary_key = $key;
push @columns, $key if ($opt_pk);
}
elsif ($$info{$key}->{Key} eq 'MUL') {
print "# Unique key is '$key'\n" if ($opt_verbose);
push @unique_keys, $key;
push @columns, $key;
}
else {
print "# Found column '$key'\n" if ($opt_verbose);
push @columns, $key;
push @update_columns, $key if ($opt_update_sum);
}
}
#$sql_info->dump_results();
print "# Preparing SQL commands...\n" if ($opt_verbose);
#print "SELECT $primary_key, " . join(', ', @columns) . " FROM $table\n";
#print "INSERT INTO $table (" . join(', ', @columns) . ") VALUES (?" . ", ?" x $#columns . ")\n";
#print "DELETE FROM $table WHERE $primary_key = ? LIMIT 1\n";
if ($opt_nd) {
$sql_max_id = $dbh2->prepare(
"SELECT MAX($primary_key) FROM $table" . (defined($opt_where) ? " WHERE $opt_where" : ""))
or die $dbh1->errstr;
$sql_read_data = $dbh1->prepare(
"SELECT $primary_key, " . join( ', ', @columns ) . " FROM $table WHERE $primary_key > ?" . (defined($opt_where) ? " AND $opt_where" : ""))
or die $dbh1->errstr;
}
else {
$sql_delete_data = $dbh1->prepare(
"DELETE FROM $table WHERE $primary_key = ? LIMIT 1" ) or die $dbh1->errstr;
$sql_read_data = $dbh1->prepare(
"SELECT $primary_key, " . join( ', ', @columns ) . " FROM $table" . (defined($opt_where) ? " WHERE $opt_where" : ""))
or die $dbh1->errstr;
}
$sql_write_data = $dbh2->prepare(
"INSERT INTO $table (" . join( ', ', @columns ) . ") VALUES (?" . ", ?" x $#columns . ")" )
or die $dbh2->errstr;
$sql_update_data = $dbh2->prepare(
"UPDATE $table SET " . join( ' + ?, ', map { "$_ = $_"; } @update_columns ) . " + ?
WHERE " . join( ' = ? AND ', @unique_keys ) . " = ?")
or die $dbh2->errstr;
while (1) {
# MySQL auto reconnect {{{
my $num = 0;
foreach my $dbh ($dbh1, $dbh2) {
$num++;
my $rv = 0;
while (!$rv) {
$rv = $dbh->ping();
$dbh->{mysql_auto_reconnect} = 1;
$rv += $dbh->ping();
$dbh->{mysql_auto_reconnect} = 1;
unless ($rv) {
warn cur_date(), " ### Warning: DB #$num/2 ping error !!\n";
sleep 1;
}
}
} # }}}
if ($opt_nd) {
$sql_max_id->execute() or warn $dbh2->errstr;
my $max_id = ($sql_max_id->fetchrow_array())[0] || 0;
$sql_read_data->execute($max_id) or warn $dbh1->errstr;
}
else {
$sql_read_data->execute() or warn $dbh1->errstr;
}
while (my $row = $sql_read_data->fetchrow_hashref()) {
my $prim_key = $row->{$primary_key}; # PRIMARY KEY column, which is not inserted
if ($sql_write_data->execute(map { $row->{$_} } @columns)) {
print cur_date(), "\tData with key '$prim_key' succesfully inserted with key ",
$dbh2->{"mysql_insertid"}, "\n";
if (! $opt_nd) {
$sql_delete_data->execute($prim_key)
or warn cur_date(), "\tError removing data with key '$prim_key': ",
$dbh1->errstr, "\n";
}
}
elsif ($opt_update_sum) {
my $iv = $sql_update_data->execute(map { $row->{$_} } @update_columns, @unique_keys);
if ($iv) {
print cur_date(), "\tData succesfully updated, primary key '$prim_key', unique key ", join('-', map { $row->{$_} } @unique_keys), "\n";
$sql_delete_data->execute($prim_key)
or warn cur_date(), "\tError removing data with key '$prim_key': ",
$dbh1->errstr, "\n";
}
else {
warn cur_date(), "\tError updating data with key '$prim_key': ",
$dbh2->errstr, "\n";
}
}
else {
warn cur_date(), "\tError writing data with key '$prim_key': ",
$dbh2->errstr, "\n";
}
}
last if ($opt_exit);
sleep $timeout;
}
sub cur_date()
{ # {{{
my ($sec, $msec) = Time::HiRes::gettimeofday;
my $time_string = strftime "%Y-%m-%d %X.$msec", localtime;
return $time_string;
} # }}}
sub read_input()
{ # {{{
my $str = <STDIN>;
chomp $str;
return $str;
} # }}}
sub usage()
{ # {{{
print STDERR "Usage:\n";
print STDERR " $0 [options] database=DB1,host=localhost database=DB2,host=localhost Table\n";
print STDERR " [user1] [pass1] [user2] [pass2]\n";
print STDERR "Options:\n";
print STDERR " -pk copy/replicate primary key as well\n";
print STDERR " -nd skip removal of source items, insert new only\n";
print STDERR " -where copy/replicate only records where condition is true\n";
print STDERR " -timeout how often pool database (default 10 sec)\n";
print STDERR " -exit exit immediately after execution\n";
print STDERR " -verbose add some notes on the output\n";
print STDERR " -update-sum data in destination table can be updated according UNIQUE KEY\n";
exit 1;
} # }}}
# vim: ts=4
# vim600: fdm=marker fdl=0 fdc=3
Platon Group <platon@platon.sk> http://platon.sk/
|