#!/usr/bin/perl # # mysql-table-copy.pl - move data from one table in database to # table in another database. Can be used for one # table at once. # # Developed by Lubomir Host # Copyright (c) 2003-2008 Platon SDG, http://platon.sk/ # Licensed under terms of GNU General Public License. # All rights reserved. # # $Platon: scripts/perl/mysql/mysql-table-copy.pl,v 1.17 2008-02-03 23:57:44 rajo Exp $ # TODO: - reconnect on closed connection # - managment for tables with multiple KEY columns use strict; use DBI; use Time::HiRes qw(gettimeofday tv_interval); use POSIX qw(strftime); use Getopt::Long; use vars qw ( $opt_exit $opt_pk $opt_nd $opt_where $opt_verbose $opt_update_sum $dbh1 $db_name1 $user1 $pass1 $dbh2 $db_name2 $user2 $pass2 $table $timeout $default_timeout $primary_key @columns @unique_keys @update_columns $sql_max_id $sql_read_data $sql_write_data $sql_delete_data $sql_update_data ); sub cur_date(); sub read_input(); sub usage(); $default_timeout = 10; # 10 seconds my $res = GetOptions( 'pk|p' => \$opt_pk, 'nd|n' => \$opt_nd, 'where|w=s' => \$opt_where, 'exit|e' => \$opt_exit, 'verbose|v' => \$opt_verbose, 'timeout|t=i' => \$timeout, 'update-sum' => \$opt_update_sum, ); #use Data::Dumper; #die Dumper({ # 'pk' => $opt_pk, # 'nd' => $opt_nd, # 'where|w=s' => $opt_where, # 'exit' => $opt_exit, # 'verbose|v' => $opt_verbose, # 'timeout|t=i' => $timeout, # 'update-sum' => $opt_update_sum, #}); #die Dumper(\@ARGV); usage() unless (scalar(@ARGV)); ($db_name1, $db_name2, $table, $user1, $pass1, $user2, $pass2) = @ARGV; usage() if (! defined $db_name1 or ! defined $db_name2); # we are using ',' as separators, because ';' is separator in shell $db_name1 =~ s/,/;/g; $db_name2 =~ s/,/;/g; $db_name1 =~ m/^DBI:/ or $db_name1 = "DBI:mysql:$db_name1"; $db_name2 =~ m/^DBI:/ or $db_name2 = "DBI:mysql:$db_name2"; if (!defined $timeout or $timeout !~ m/^[0-9]+$/) { print STDERR "Using default timeout $default_timeout seconds\n" if ($opt_verbose); $timeout = $default_timeout; } # user, password and connect {{{ print "Enter username for database $db_name1:\n" and $user1 = read_input() unless defined $user1; print "Enter password for user $user1:\n" and $pass1 = read_input() unless defined $pass1; print "Enter username for database $db_name2:\n" and $user2 = read_input() unless defined $user2; print "Enter password for user $user2:\n" and $pass2 = read_input() unless defined $pass2; $dbh1 = DBI->connect($db_name1, $user1, $pass1) or die $DBI::errstr; $dbh2 = DBI->connect($db_name2, $user2, $pass2) or die $DBI::errstr; $dbh1->{mysql_auto_reconnect} = 1; $dbh2->{mysql_auto_reconnect} = 1; $dbh1->{AutoCommit} = 1; $dbh2->{AutoCommit} = 1; # # Please, optimize me! # if ($db_name1 =~ m/^DBI:Oracle:/) { my $ignored = $dbh1->do('ALTER SESSION SET NLS_DATE_FORMAT = \'YYYY-MM-DD HH24:MI:SS\'') or die "DBI::do(): $DBI::errstr"; } if ($db_name2 =~ m/^DBI:Oracle:/) { my $ignored = $dbh2->do('ALTER SESSION SET NLS_DATE_FORMAT = \'YYYY-MM-DD HH24:MI:SS\'') or die "DBI::do(): $DBI::errstr"; } # }}} # look for PRIMARY KEY and other columns my $sql_info = $dbh1->prepare("DESCRIBE " . $table); $sql_info->execute(); my $info= $sql_info->fetchall_hashref('Field'); foreach my $key (sort keys %$info) { if ($$info{$key}->{Key} eq 'PRI') { print "# Primary key is '$key'\n" if ($opt_verbose); $primary_key = $key; push @columns, $key if ($opt_pk); } elsif ($$info{$key}->{Key} eq 'MUL') { print "# Unique key is '$key'\n" if ($opt_verbose); push @unique_keys, $key; push @columns, $key; } else { print "# Found column '$key'\n" if ($opt_verbose); push @columns, $key; push @update_columns, $key if ($opt_update_sum); } } #$sql_info->dump_results(); print "# Preparing SQL commands...\n" if ($opt_verbose); #print "SELECT $primary_key, " . join(', ', @columns) . " FROM $table\n"; #print "INSERT INTO $table (" . join(', ', @columns) . ") VALUES (?" . ", ?" x $#columns . ")\n"; #print "DELETE FROM $table WHERE $primary_key = ? LIMIT 1\n"; if ($opt_nd) { $sql_max_id = $dbh2->prepare( "SELECT MAX($primary_key) FROM $table" . (defined($opt_where) ? " WHERE $opt_where" : "")) or die $dbh1->errstr; $sql_read_data = $dbh1->prepare( "SELECT $primary_key, " . join( ', ', @columns ) . " FROM $table WHERE $primary_key > ?" . (defined($opt_where) ? " AND $opt_where" : "")) or die $dbh1->errstr; } else { $sql_delete_data = $dbh1->prepare( "DELETE FROM $table WHERE $primary_key = ? LIMIT 1" ) or die $dbh1->errstr; $sql_read_data = $dbh1->prepare( "SELECT $primary_key, " . join( ', ', @columns ) . " FROM $table" . (defined($opt_where) ? " WHERE $opt_where" : "")) or die $dbh1->errstr; } $sql_write_data = $dbh2->prepare( "INSERT INTO $table (" . join( ', ', @columns ) . ") VALUES (?" . ", ?" x $#columns . ")" ) or die $dbh2->errstr; $sql_update_data = $dbh2->prepare( "UPDATE $table SET " . join( ' + ?, ', map { "$_ = $_"; } @update_columns ) . " + ? WHERE " . join( ' = ? AND ', @unique_keys ) . " = ?") or die $dbh2->errstr; while (1) { # MySQL auto reconnect {{{ my $num = 0; foreach my $dbh ($dbh1, $dbh2) { $num++; my $rv = 0; while (!$rv) { $rv = $dbh->ping(); $dbh->{mysql_auto_reconnect} = 1; $rv += $dbh->ping(); $dbh->{mysql_auto_reconnect} = 1; unless ($rv) { warn cur_date(), " ### Warning: DB #$num/2 ping error !!\n"; sleep 1; } } } # }}} if ($opt_nd) { $sql_max_id->execute() or warn $dbh2->errstr; my $max_id = ($sql_max_id->fetchrow_array())[0] || 0; $sql_read_data->execute($max_id) or warn $dbh1->errstr; } else { $sql_read_data->execute() or warn $dbh1->errstr; } while (my $row = $sql_read_data->fetchrow_hashref()) { my $prim_key = $row->{$primary_key}; # PRIMARY KEY column, which is not inserted if ($sql_write_data->execute(map { $row->{$_} } @columns)) { print cur_date(), "\tData with key '$prim_key' succesfully inserted with key ", $dbh2->{"mysql_insertid"}, "\n"; if (! $opt_nd) { $sql_delete_data->execute($prim_key) or warn cur_date(), "\tError removing data with key '$prim_key': ", $dbh1->errstr, "\n"; } } elsif ($opt_update_sum) { my $iv = $sql_update_data->execute(map { $row->{$_} } @update_columns, @unique_keys); if ($iv) { print cur_date(), "\tData succesfully updated, primary key '$prim_key', unique key ", join('-', map { $row->{$_} } @unique_keys), "\n"; $sql_delete_data->execute($prim_key) or warn cur_date(), "\tError removing data with key '$prim_key': ", $dbh1->errstr, "\n"; } else { warn cur_date(), "\tError updating data with key '$prim_key': ", $dbh2->errstr, "\n"; } } else { warn cur_date(), "\tError writing data with key '$prim_key': ", $dbh2->errstr, "\n"; } } last if ($opt_exit); sleep $timeout; } sub cur_date() { # {{{ my ($sec, $msec) = Time::HiRes::gettimeofday; my $time_string = strftime "%Y-%m-%d %X.$msec", localtime; return $time_string; } # }}} sub read_input() { # {{{ my $str = ; chomp $str; return $str; } # }}} sub usage() { # {{{ print STDERR "Usage:\n"; print STDERR " $0 [options] database=DB1,host=localhost database=DB2,host=localhost Table\n"; print STDERR " [user1] [pass1] [user2] [pass2]\n"; print STDERR "Options:\n"; print STDERR " -pk copy/replicate primary key as well\n"; print STDERR " -nd skip removal of source items, insert new only\n"; print STDERR " -where copy/replicate only records where condition is true\n"; print STDERR " -timeout how often pool database (default 10 sec)\n"; print STDERR " -exit exit immediately after execution\n"; print STDERR " -verbose add some notes on the output\n"; print STDERR " -update-sum data in destination table can be updated according UNIQUE KEY\n"; exit 1; } # }}} # vim: ts=4 # vim600: fdm=marker fdl=0 fdc=3