Platon Technologies
neprihlásený Prihlásiť Registrácia
SlovakEnglish
open source software development oslavujeme 10 rokov vývoja otvoreného softvéru! Utorok, 16. apríl 2024

Súbor: [Platon] / scripts / perl / mysql / mysql-table-copy.pl (stiahnutie)

Revízia 1.18, Wed Feb 6 00:07:29 2008 UTC (16 years, 2 months ago) by rajo


Zmeny od 1.17: +45 -15 [lines]

Data in destination table can be updated according UNIQUE KEY

#!/usr/bin/perl

#
# mysql-table-copy.pl - move data from one table in database to
#                       table in another database. Can be used for one
#                       table at once.
#
# Developed by Lubomir Host <rajo AT platon.sk>
# Copyright (c) 2003-2008 Platon SDG, http://platon.sk/
# Licensed under terms of GNU General Public License.
# All rights reserved.
#
# $Platon: scripts/perl/mysql/mysql-table-copy.pl,v 1.17 2008-02-03 23:57:44 rajo Exp $

# TODO: - reconnect on closed connection
#       - managment for tables with multiple KEY columns

use strict;

use DBI;
use Time::HiRes qw(gettimeofday tv_interval); 
use POSIX qw(strftime);
use Getopt::Long;

use vars qw (
    $opt_exit $opt_pk $opt_nd $opt_where $opt_verbose $opt_update_sum
    $dbh1 $db_name1 $user1 $pass1
    $dbh2 $db_name2 $user2 $pass2
    $table $timeout
    $default_timeout
    $primary_key @columns
    @unique_keys
    @update_columns

    $sql_max_id $sql_read_data $sql_write_data $sql_delete_data $sql_update_data
);

sub cur_date();
sub read_input();
sub usage();

$default_timeout = 10; # 10 seconds

my $res = GetOptions(
    'pk|p'            => \$opt_pk,
    'nd|n'            => \$opt_nd,
    'where|w=s'        => \$opt_where,
    'exit|e'        => \$opt_exit,
    'verbose|v'        => \$opt_verbose,
    'timeout|t=i'    => \$timeout,
    'update-sum'    => \$opt_update_sum,
);

#use Data::Dumper;
#die Dumper({
#    'pk'            => $opt_pk,
#    'nd'            => $opt_nd,
#    'where|w=s'        => $opt_where,
#    'exit'            => $opt_exit,
#    'verbose|v'        => $opt_verbose,
#    'timeout|t=i'    => $timeout,
#    'update-sum'    => $opt_update_sum,
#});
#die Dumper(\@ARGV);

usage() unless (scalar(@ARGV));

($db_name1, $db_name2, $table, $user1, $pass1, $user2, $pass2) = @ARGV;

usage() if (! defined $db_name1 or ! defined $db_name2);

# we are using ',' as separators, because ';' is separator in shell
$db_name1 =~ s/,/;/g;
$db_name2 =~ s/,/;/g;

$db_name1 =~ m/^DBI:/ or $db_name1 = "DBI:mysql:$db_name1";
$db_name2 =~ m/^DBI:/ or $db_name2 = "DBI:mysql:$db_name2";

if (!defined $timeout or $timeout !~ m/^[0-9]+$/) {
    print STDERR "Using default timeout $default_timeout seconds\n" if ($opt_verbose);
    $timeout = $default_timeout;
}

# user, password and connect {{{
print "Enter username for database $db_name1:\n"    and $user1 = read_input() unless defined $user1;
print "Enter password for user $user1:\n"            and $pass1 = read_input() unless defined $pass1;
print "Enter username for database $db_name2:\n"    and $user2 = read_input() unless defined $user2;
print "Enter password for user $user2:\n"            and $pass2 = read_input() unless defined $pass2;

$dbh1 = DBI->connect($db_name1, $user1, $pass1) or die $DBI::errstr;
$dbh2 = DBI->connect($db_name2, $user2, $pass2) or die $DBI::errstr;
$dbh1->{mysql_auto_reconnect} = 1;
$dbh2->{mysql_auto_reconnect} = 1;
$dbh1->{AutoCommit} = 1;
$dbh2->{AutoCommit} = 1;

#
# Please, optimize me!
#

if ($db_name1 =~ m/^DBI:Oracle:/) {
    my $ignored = $dbh1->do('ALTER SESSION SET NLS_DATE_FORMAT = \'YYYY-MM-DD HH24:MI:SS\'')
      or die "DBI::do(): $DBI::errstr";
}

if ($db_name2 =~ m/^DBI:Oracle:/) {
    my $ignored = $dbh2->do('ALTER SESSION SET NLS_DATE_FORMAT = \'YYYY-MM-DD HH24:MI:SS\'')
      or die "DBI::do(): $DBI::errstr";
}

# }}}

# look for PRIMARY KEY and other columns
my $sql_info = $dbh1->prepare("DESCRIBE " . $table);
$sql_info->execute();
my $info= $sql_info->fetchall_hashref('Field');
foreach my $key (sort keys %$info) {
    if ($$info{$key}->{Key} eq 'PRI') {
        print "# Primary key is '$key'\n" if ($opt_verbose);
        $primary_key = $key;
        push @columns, $key if ($opt_pk);
    }
    elsif ($$info{$key}->{Key} eq 'MUL') {
        print "# Unique key is '$key'\n" if ($opt_verbose);
        push @unique_keys, $key;
        push @columns, $key;
    }
    else {
        print "# Found column '$key'\n" if ($opt_verbose);
        push @columns, $key;
        push @update_columns, $key if ($opt_update_sum);
    }
}
#$sql_info->dump_results();

print "# Preparing SQL commands...\n" if ($opt_verbose);
#print "SELECT $primary_key, " . join(', ', @columns) . " FROM $table\n";
#print "INSERT INTO $table  (" . join(', ', @columns) . ") VALUES (?" . ", ?" x $#columns . ")\n";
#print "DELETE FROM $table WHERE $primary_key = ? LIMIT 1\n";

if ($opt_nd) {
    $sql_max_id = $dbh2->prepare(
        "SELECT MAX($primary_key) FROM $table" . (defined($opt_where) ? " WHERE $opt_where" : ""))
      or die $dbh1->errstr;
    $sql_read_data = $dbh1->prepare(
        "SELECT $primary_key, " . join( ', ', @columns ) . " FROM $table WHERE $primary_key > ?"  . (defined($opt_where) ? " AND $opt_where" : ""))
      or die $dbh1->errstr;
}
else {
    $sql_delete_data = $dbh1->prepare(
        "DELETE FROM $table WHERE $primary_key = ? LIMIT 1" ) or die $dbh1->errstr;
    $sql_read_data = $dbh1->prepare(
        "SELECT $primary_key, " . join( ', ', @columns ) . " FROM $table" . (defined($opt_where) ? " WHERE $opt_where" : ""))
      or die $dbh1->errstr;
}
$sql_write_data = $dbh2->prepare(
    "INSERT INTO $table  (" . join( ', ', @columns ) . ") VALUES (?" . ", ?" x $#columns . ")" )
    or die $dbh2->errstr;

$sql_update_data = $dbh2->prepare(
    "UPDATE $table SET " . join( ' + ?, ', map { "$_ = $_"; } @update_columns ) . " + ?
    WHERE " . join( ' = ? AND ', @unique_keys ) . " = ?")
    or die  $dbh2->errstr;

while (1) {

    # MySQL auto reconnect {{{
    my $num = 0;
    foreach my $dbh ($dbh1, $dbh2) {
        $num++;
        my $rv = 0;
        while (!$rv) {
            $rv = $dbh->ping();
            $dbh->{mysql_auto_reconnect} = 1;

            $rv += $dbh->ping();
            $dbh->{mysql_auto_reconnect} = 1;

            unless ($rv) {
                warn cur_date(), " ### Warning: DB #$num/2 ping error !!\n";
                sleep 1;
            }
        }
    } # }}}

    if ($opt_nd) {
        $sql_max_id->execute() or warn $dbh2->errstr;
        my $max_id = ($sql_max_id->fetchrow_array())[0] || 0;
        $sql_read_data->execute($max_id) or warn $dbh1->errstr;
    }
    else {
        $sql_read_data->execute() or warn $dbh1->errstr;
    }
    while (my $row = $sql_read_data->fetchrow_hashref()) {
        my $prim_key = $row->{$primary_key}; # PRIMARY KEY column, which is not inserted
        if ($sql_write_data->execute(map { $row->{$_} } @columns)) {
            print cur_date(), "\tData with key '$prim_key' succesfully inserted with key ",
            $dbh2->{"mysql_insertid"}, "\n";
            if (! $opt_nd) {
                $sql_delete_data->execute($prim_key)
                    or warn cur_date(), "\tError removing data with key '$prim_key': ",
                $dbh1->errstr, "\n";
            }
        }
        elsif ($opt_update_sum) {
            my $iv = $sql_update_data->execute(map { $row->{$_} } @update_columns, @unique_keys);
            if ($iv) {
                print cur_date(), "\tData succesfully updated, primary key '$prim_key', unique key ", join('-', map { $row->{$_} } @unique_keys), "\n";
                $sql_delete_data->execute($prim_key)
                    or warn cur_date(), "\tError removing data with key '$prim_key': ",
                $dbh1->errstr, "\n";
            }
            else {
                warn cur_date(), "\tError updating data with key '$prim_key': ",
                $dbh2->errstr, "\n";
            }
        }
        else {
            warn cur_date(), "\tError writing data with key '$prim_key': ",
            $dbh2->errstr, "\n";
        }
    }

    last if ($opt_exit);
    sleep $timeout;
}


sub cur_date()
{ # {{{
    my ($sec, $msec) = Time::HiRes::gettimeofday;
    my $time_string = strftime "%Y-%m-%d %X.$msec", localtime;

    return $time_string;
} # }}}

sub read_input()
{ # {{{
    my $str = <STDIN>;
    chomp $str;

    return $str;
} # }}}

sub usage()
{ # {{{
    print STDERR "Usage:\n";
    print STDERR "  $0 [options] database=DB1,host=localhost database=DB2,host=localhost Table\n";
    print STDERR "    [user1] [pass1] [user2] [pass2]\n";
    print STDERR "Options:\n";
    print STDERR "  -pk         copy/replicate primary key as well\n";
    print STDERR "  -nd         skip removal of source items, insert new only\n";
    print STDERR "  -where      copy/replicate only records where condition is true\n";
    print STDERR "  -timeout    how often pool database (default 10 sec)\n";
    print STDERR "  -exit       exit immediately after execution\n";
    print STDERR "  -verbose    add some notes on the output\n";
    print STDERR "  -update-sum data in destination table can be updated according UNIQUE KEY\n";
    exit 1;
} # }}}

# vim: ts=4
# vim600: fdm=marker fdl=0 fdc=3

Platon Group <platon@platon.sk> http://platon.sk/
Copyright © 2002-2006 Platon Group
Stránka používa redakčný systém Metafox
Na začiatok