# Perl DB synchronization toolkit

#created for postgres 7.0.2 +
use strict;

BEGIN {
        use vars       qw($VERSION);
        # set the version for version checking
        $VERSION     = 1.00;
}


package Synchronize;

use DBI;

use Date::Parse;

# new requires 3 arguments: dbi connection string, plus the corresponding username and password to get connected to the database
sub new {
	my $proto = shift;
	my $class = ref($proto) || $proto;
	my $self = {};

	my $dbi = shift;
	my $user = shift;
	my $pass = shift;

	$self->{DBH} = DBI->connect($dbi,$user,$pass) || die "Failed to connect to database: ".DBI->errstr();

	$self->{user} = undef;
	$self->{node} = undef;
	$self->{status} = undef; # holds status of table update portion of session
	$self->{pubs} = {}; #holds hash of pubs available to sessiom with val = 1 if ok to request sync
	$self->{orderpubs} = undef; #holds array ref of subscribed pubs ordered by sync_order
	$self->{this_post_ver} = undef; #holds the version number under which this session will post changes
	$self->{max_ver} = undef; #holds the maximum safe version for getting updates
	$self->{current} = {}; #holds the current publication info to which changes are being applied
	$self->{queue} = 'server'; # tells collide function what to do with collisions. (default is to hold on server)

	$self->{DBLOG}= DBI->connect($dbi,$user,$pass) || die "cannot log to DB: ".DBI->errstr(); 


	return bless ($self, $class);
}

sub dblog { 
	my $self = shift;
	my $msg = $self->{DBLOG}->quote($_[0]);
	my $quser = $self->{DBH}->quote($self->{user});
	my $qnode = $self->{DBH}->quote($self->{node});
	$self->{DBLOG}->do("insert into ____sync_log____ (username, nodename,stamp, message) values($quser, $qnode, now(), $msg)");
}


#start_session establishes session wide information and other housekeeping chores
	# Accepts username, nodename and queue (client or server) as arguments;

sub start_session {
	my $self = shift;
	$self->{user} = shift || die 'Username is required';
	$self->{node} = shift || die 'Nodename is required';
	$self->{queue} = shift;


	if ($self->{queue} ne 'server' && $self->{queue} ne 'client') {
		die "You must provide a queue argument of either 'server' or 'client'";
	}

	my $quser = $self->{DBH}->quote($self->{user});
	my $qnode = $self->{DBH}->quote($self->{node});

	my $sql = "select pubname from ____subscribed____ where username = $quser and nodename = $qnode";
	my @pubs = $self->GetColList($sql);

	return 'User/Node has no subscriptions!' if !defined(@pubs);

	# go though the list and check permissions and rules for each
	foreach my $pub (@pubs) {
		my $qpub = $self->{DBH}->quote($pub);
		my $sql = "select disabled, pubname, fullrefreshonly, refreshonce,post_ver from ____subscribed____ where username = $quser and pubname = $qpub and nodename = $qnode";
		my $sth = $self->{DBH}->prepare($sql) || die $self->{DBH}->errstr;
		$sth->execute || die $self->{DBH}->errstr;
		my @row;
		while (@row = $sth->fetchrow_array) {
			next if $row[0]; #publication is disabled
			next if !defined($row[1]); #publication does not exist (should never occur)
			if ($row[2] || $row[3]) { #refresh of refresh once flag is set
				$self->{pubs}->{$pub} = 0; #refresh only
				next;
			}
			if (!defined($row[4])) { #no previous session exists, must refresh
				$self->{pubs}->{$pub} = 0; #refresh only
				next;
			}
			$self->{pubs}->{$pub} = 1; #OK for sync
		}
		$sth->finish;
	}


	$sql = "select pubname from ____publications____ order by sync_order";
	my @op = $self->GetColList($sql);
	my @orderpubs;

	#loop through ordered pubs and remove non subscribed publications
	foreach my $pub (@op) {
		push @orderpubs, $pub if defined($self->{pubs}->{$pub});
	}
	
	$self->{orderpubs} = \@orderpubs;

# Now we obtain a session version number, etc.

	$self->{DBH}->{AutoCommit} = 0; #allows "transactions"
	$self->{DBH}->{RaiseError} = 1; #script [or eval] will automatically die on errors

	eval { #start DB transaction

	#lock the version sequence until we determin that we have gotten
	#a good  value.  Lock will be released on commit.
		$self->{DBH}->do('lock ____version_seq____ in access exclusive mode');

	# remove stale locks if they exist
		my $sql = "delete from ____last_stable____ where username = $quser and nodename = $qnode";
		$self->{DBH}->do($sql);

	# increment version sequence & grab the next val as post_ver
		my $sql = "select nextval('____version_seq____')";
		my $sth = $self->{DBH}->prepare($sql);
		$sth->execute;
		($self->{this_post_ver}) = $sth->fetchrow_array();
		$sth->finish;
	# grab max_ver from last_stable

		$sql = "select min(version) from ____last_stable____"; 
		$sth = $self->{DBH}->prepare($sql);
		$sth->execute;
		($self->{max_ver}) = $sth->fetchrow_array();
		$sth->finish;

	# if there was no version in lock table, then take the ID that was in use
	# when we started the session ($max_ver -1)

		$self->{max_ver} = $self->{this_post_ver} -1 if (!defined($self->{max_ver}));

	# lock post_ver by placing it in last_stable
		$self->{DBH}->do("insert into ____last_stable____ (version, username, nodename) values ($self->{this_post_ver}, $quser,$qnode)");

	# increment version sequence again (discard result)
		$sql = "select nextval('____version_seq____')";
		$sth = $self->{DBH}->prepare($sql);
		$sth->execute;
		$sth->fetchrow_array();
		$sth->finish;

	}; #end eval/transaction

	if ($@) { # part of transaction failed
		return 'Start session failed';
		$self->{DBH}->rollback;
	} else { # all's well commit block
		$self->{DBH}->commit;
	}
	$self->{DBH}->{AutoCommit} = 1;
	$self->{DBH}->{RaiseError} = 0;

	return undef;

}

#start changes should be called once before applying individual change requests
	# Requires publication and ref to columns that will be updated as arguments
sub start_changes {
	my $self = shift;
	my $pub = shift || die 'Publication is required';
	my $colref = shift || die 'Reference to column array is required';

	$self->{status} = 'starting';

	my $qpub = $self->{DBH}->quote($pub);
	my $quser = $self->{DBH}->quote($self->{user});
	my $qnode = $self->{DBH}->quote($self->{node});

	my @cols = @{$colref};
	my @subcols = $self->GetColList("select col_name from ____subscribed_cols____ where username = $quser and nodename = $qnode and pubname = $qpub");
	my %subcols;
	foreach my $col (@subcols) {
		$subcols{$col} = 1;
	}
	foreach my $col (@cols) {	
		return "User/node is not subscribed to column '$col'" if !$subcols{$col};
	}

	my $sql = "select pubname, readonly, last_session, post_ver, last_ver, whereclause, sanity_limit, 
sanity_delete, sanity_update, sanity_insert from ____subscribed____ where username = $quser and pubname = $qpub and nodename = $qnode";
	my ($junk, $readonly, $last_session, $post_ver, $last_ver, $whereclause, $sanity_limit, 
$sanity_delete, $sanity_update, $sanity_insert) = $self->GetOneRow($sql);
	
	return 'Publication is read only' if $readonly;

	$sql = "select whereclause from ____publications____ where pubname = $qpub";
	my ($wc) = $self->GetOneRow($sql);
	$whereclause = '('.$whereclause.')' if $whereclause;
	$whereclause = $whereclause.' and ('.$wc.')' if $wc;

	my ($table) = $self->GetOneRow("select tablename from ____publications____ where pubname = $qpub");

	return 'Publication is not registered correctly' if !defined($table);

	my %info;
	$info{pub} = $pub;
	$info{whereclause} = $whereclause;
	$info{post_ver} = $post_ver;
	$last_session =~ s/([+|-]\d\d?)$/ $1/;	#put a space before timezone	
	$last_session = str2time ($last_session); #convert to perltime (seconds since 1970)
	$info{last_session} = $last_session;
	$info{last_ver} = $last_ver;
	$info{table}  = $table;
	$info{cols} = \@cols;

	my $sql = "select count(oid) from $table";
	$sql = $sql .' '.$whereclause if $whereclause;
	my ($rowcount) = $self->GetOneRow($sql);

	#calculate sanity levels (convert from % to number of rows)
	# limits defined as less than 1 mean no limit
	$info{sanitylimit} = $rowcount * ($sanity_limit / 100) if $sanity_limit > 0;
	$info{insertlimit} = $rowcount * ($sanity_insert / 100) if $sanity_insert > 0;
	$info{updatelimit} = $rowcount * ($sanity_update / 100) if $sanity_update > 0;
	$info{deletelimit} = $rowcount * ($sanity_delete / 100) if $sanity_delete > 0;

	$self->{sanitycount} = 0;
	$self->{updatecount} = 0;
	$self->{insertcount} = 0;
	$self->{deletecount} = 0;

	$self->{current} = \%info;

	$self->{DBH}->{AutoCommit} = 0; #turn on transaction behavior so we can roll back on sanity limits, etc.

	$self->{status} = 'ready';

	return undef;
}

#call this once all changes are submitted to commit them;
sub end_changes {
	my $self = shift;
	return undef if $self->{status} ne 'ready';
	$self->{DBH}->commit;
	$self->{DBH}->{AutoCommit} = 1;
	$self->{status} = 'success';
	return undef;
}

#call apply_change once for each row level client update
	# Accepts 4 params: rowid, action, timestamp and reference to data array
	#	Note: timestamp can be undef, data can be undef
	#		timestamp MUST be in perl time (secs since 1970)

#this routine checks basic timestamp info and sanity limits, then passes the info along to do_action() for processing
sub apply_change {
	my $self = shift;
	my $rowid = shift || return 'Row ID is required'; #don't die just for one bad row
	my $action = shift || return 'Action is required'; #don't die just for one bad row
	my $timestamp = shift;
	my $dataref = shift;
	$action = lc($action);

	$timestamp = str2time($timestamp) if $timestamp;

	return 'Status failure, cannot accept changes: '.$self->{status} if $self->{status} ne 'ready';

	my %info = %{$self->{current}};

	$self->{sanitycount}++;
	if ($info{sanitylimit} && $self->{sanitycount} > $info{sanitylimit}) {
		# too many changes from client
		my $ret = $self->sanity('limit');
		return $ret if $ret;
	}

	
	if ($timestamp && $timestamp > time() + 3600) { # current time + one hour
		#client's clock is way off, cannot submit changes in future
		my $ret = $self->collide('future', $info{table}, $rowid, $action, undef, $timestamp, $dataref, $self->{queue});
		return $ret if $ret;
	}

	if ($timestamp && $timestamp < $info{last_session} - 3600) { # last session time less one hour
		#client's clock is way off, cannot submit changes that occured before last sync date
		my $ret = $self->collide('past', $info{table}, $rowid, $action, undef, $timestamp, $dataref , $self->{queue});
		return $ret if $ret;
	}

	my ($crow, $cver, $ctime); #current row,ver,time
	if ($action ne 'insert') {
		my $sql = "select ____rowid____, ____rowver____, ____stamp____ from $info{table} where ____rowid____ = $rowid";
		($crow, $cver, $ctime) = $self->GetOneRow($sql);
		if (!defined($crow)) {
			my $ret = $self->collide('norow', $info{table}, $rowid, $action, undef, $timestamp, $dataref , $self->{queue});
			return $ret if $ret;		
		}

		$ctime =~ s/([+|-]\d\d?)$/ $1/; #put space between timezone
		$ctime = str2time($ctime) if $ctime; #convert to perl time

		if ($timestamp) {
			if ($ctime < $timestamp) {
				my $ret = $self->collide('time', $info{table}, $rowid, $action, undef, $timestamp, $dataref, $self->{queue} );		
				return $ret if $ret;
			}

		} else {
			if ($cver > $self->{this_post_ver}) {
				my $ret = $self->collide('version', $info{table}, $rowid, $action, undef, $timestamp, $dataref, $self->{queue} );
				return $ret if $ret;
			}
		}
	
	}

	if ($action eq 'insert') {
		$self->{insertcount}++;
		if ($info{insertlimit} && $self->{insertcount} > $info{insertlimit}) {
			# too many changes from client
			my $ret = $self->sanity('insert');
			return $ret if $ret;
		}

		my $qtable = $self->{DBH}->quote($info{table});
		my ($rowidsequence) = '_'.$self->GetOneRow("select table_id from ____tables____ where tablename = $qtable").'__rowid_seq';
		return 'Table incorrectly registered, cannot get rowid sequence name: '.$self->{DBH}->errstr() if not defined $rowidsequence;

		my @data;
		foreach my $val (@{$dataref}) {
			push @data, $self->{DBH}->quote($val);
		}
		my $sql = "insert into $info{table} (";
		if ($timestamp) {
			$sql = $sql . join(',',@{$info{cols}}) . ',____rowver____, ____stamp____) values (';
			$sql = $sql . join (',',@data) .','.$self->{this_post_ver}.',\''.localtime($timestamp).'\')';
		} else {
			$sql = $sql . join(',',@{$info{cols}}) . ',____rowver____) values (';
			$sql = $sql . join (',',@data) .','.$self->{this_post_ver}.')';
		}
		my $ret = $self->{DBH}->do($sql);
		if (!$ret) {
			my $ret = $self->collide($self->{DBH}->errstr(), $info{table}, $rowid, $action, undef, $timestamp, $dataref , $self->{queue});
			return $ret if $ret;		
		}
		my ($newrowid) = $self->GetOneRow("select currval('$rowidsequence')");
		return 'Failed to get current rowid on inserted row'.$self->{DBH}->errstr if not defined $newrowid;
		$self->changerowid($rowid, $newrowid);
	}

	if ($action eq 'update') {
		$self->{updatecount}++;
		if ($info{updatelimit} && $self->{updatecount} > $info{updatelimit}) {
			# too many changes from client
			my $ret = $self->sanity('update');
			return $ret if $ret;
		}
		my @data;
		foreach my $val (@{$dataref}) {
			push @data, $self->{DBH}->quote($val);
		}	

		my $sql = "update $info{table} set ";
		my @cols = @{$info{cols}};
		foreach my $col (@cols) {
			my $val = shift @data;
			$sql = $sql . "$col = $val,";
		}
		$sql = $sql." ____rowver____ = $self->{this_post_ver}";
		$sql = $sql.", ____stamp____ = '".localtime($timestamp)."'" if $timestamp;
		$sql = $sql." where ____rowid____ = $rowid";
		$sql = $sql." and $info{whereclause}" if $info{whereclause};
		my $ret = $self->{DBH}->do($sql);
		if (!$ret) {
			my $ret = $self->collide($self->{DBH}->errstr(), $info{table}, $rowid, $action, undef, $timestamp, $dataref , $self->{queue});
			return $ret if $ret;		
		}

	}

	if ($action eq 'delete') {
		$self->{deletecount}++;
		if ($info{deletelimit} && $self->{deletecount} > $info{deletelimit}) {
			# too many changes from client
			my $ret = $self->sanity('delete');
			return $ret if $ret;
		}
		if ($timestamp) {
			my $sql = "update $info{table} set ____rowver____ = $self->{this_post_ver}, ____stamp____ = '".localtime($timestamp)."'  where ____rowid____ = $rowid";
			$sql = $sql . " where $info{whereclause}" if $info{whereclause};
			$self->{DBH}->do($sql) || return 'Predelete update failed: '.$self->{DBH}->errstr;
		} else {
			my $sql = "update $info{table} set ____rowver____ = $self->{this_post_ver} where ____rowid____ = $rowid";
			$sql = $sql . " where $info{whereclause}" if $info{whereclause};
			$self->{DBH}->do($sql) || return 'Predelete update failed: '.$self->{DBH}->errstr;
		}
		my $sql = "delete from $info{table} where ____rowid____ = $rowid";
		$sql = $sql . " where $info{whereclause}" if $info{whereclause};
		my $ret = $self->{DBH}->do($sql);
		if (!$ret) {
			my $ret = $self->collide($self->{DBH}->errstr(), $info{table}, $rowid, $action, undef, $timestamp, $dataref , $self->{queue});
			return $ret if $ret;		
		}
}
	
	
	return undef;
}

sub changerowid {
	my $self = shift;
	my $oldid = shift;
	my $newid = shift;
	$self->writeclient('changeid',"$oldid\t$newid");
}

#writes info to client
sub writeclient {
	my $self = shift;
	my $type = shift;
	my @info = @_;
	print "$type: ",join("\t",@info),"\n";
	return undef;
}

# Override this for custom behavior.  Default is to echo back the sanity failure reason.  
# If you want to override a collision, you can do so by returning undef.
sub sanity {
	my $self = shift;
	my $reason = shift;
	$self->{status} = 'sanity exceeded';
	$self->{DBH}->rollback;
	return $reason;
}

# Override this for custom behavior.  Default is to echo back the failure reason.  
# If you want to override a collision, you can do so by returning undef.
sub collide {
	my $self = shift;
	my ($reason,$table,$rowid,$action,$rowver,$timestamp,$data, $queue) = @_;

	my @data;
	foreach my $val (@{$data}) {
		push @data, $self->{DBH}->quote($val);
	}	

	if ($reason =~ /integrity/i || $reason =~ /constraint/i) {
		$self->{status} = 'intergrity violation';
		$self->{DBH}->rollback;
	}

	my $datastring;
	my @cols = @{$self->{current}->{cols}};
	foreach my $col (@cols) {
		my $val = shift @data;
		$datastring = $datastring . "$col = $val,";
	}
	chop $datastring; #remove trailing comma

	if ($queue eq 'server') {
		$timestamp = localtime($timestamp) if defined($timestamp);
		$rowid = $self->{DBH}->quote($rowid);
		$rowid = 'null' if !defined($rowid);
		$rowver = 'null' if !defined($rowver);
		$timestamp = $self->{DBH}->quote($timestamp);
		$data = $self->{DBH}->quote($data);
		my $qtable = $self->{DBH}->quote($table);
		my $qreason = $self->{DBH}->quote($reason);
		my $qaction = $self->{DBH}->quote($action);
		my $quser = $self->{DBH}->quote($self->{user});
		my $qnode = $self->{DBH}->quote($self->{node});
		$datastring = $self->{DBH}->quote($datastring);


		my $sql = "insert into ____collision____ (rowid,
tablename, rowver, stamp, data, reason, action, username,
nodename, queue) values($rowid,$qtable, $rowver, $timestamp,$datastring,
$qreason, $qaction,$quser, $qnode)";
		$self->{DBH}->do($sql) || die 'Failed to write to collision table: '.$self->{DBH}->errstr;

	} else {

		$self->writeclient('collision',$rowid,$table, $rowver, $timestamp,$reason, $action,$self->{user}, $self->{node}, $data);

	}
	return $reason;
}

#calls get_updates once for each publication the user/node is subscribed to in correct sync_order
sub get_all_updates {
	my $self = shift;
	my $quser = $self->{DBH}->quote($self->{user});
	my $qnode = $self->{DBH}->quote($self->{node});

	foreach my $pub (@{$self->{orderpubs}}) {
		$self->get_updates($pub, 1); #request update as sync unless overrridden by flags
	}

}

# Call this once for each table the client needs refreshed or sync'ed AFTER all inbound client changes have been posted
#	Accepts publication and sync flag as arguments
sub get_updates {
	my $self = shift;
	my $pub = shift || die 'Publication is required';
	my $sync = shift;

	my $qpub = $self->{DBH}->quote($pub);
	my $quser = $self->{DBH}->quote($self->{user});
	my $qnode = $self->{DBH}->quote($self->{node});

	#enforce refresh and refreshonce flags
	undef $sync if !$self->{pubs}->{$pub}; 


	my %info = $self->{current};

	my @cols = $self->GetColList("select col_name from ____subscribed_cols____ where username = $quser and nodename = $qnode and pubname = $qpub");;

	my ($table) = $self->GetOneRow("select tablename from ____publications____ where pubname = $qpub");
	return 'Table incorrectly registered for read' if !defined($table);
	my $qtable = $self->{DBH}->quote($table);	


	my $sql = "select pubname, last_session, post_ver, last_ver, whereclause from ____subscribed____ where username = $quser and pubname = $qpub and nodename = $qnode";
	my ($junk, $last_session, $post_ver, $last_ver, $whereclause) = $self->GetOneRow($sql);

	my ($wc) = $self->GetOneRow("select whereclause from ____publications____ where pubname = $qpub");

	$whereclause = '('.$whereclause.')' if $whereclause;

	$whereclause = $whereclause.' and ('.$wc.')' if $wc;


	if ($sync) {
		$self->writeclient('start synchronize', $pub);
	} else {
		$self->writeclient('start refresh', $pub);
		$self->{DBH}->do("update ____subscribed____ set refreshonce = false where pubname = $qpub and username = $quser and nodename = $qnode") || return 'Failed to clear RefreshOnce flag: '.$self->{DBH}->errstr;
	}

	$self->writeclient('columns',@cols);



	my $sql = "select ____rowid____, ".join(',', @cols)." from $table";
	if ($sync) {
		$sql = $sql." where (____rowver____ <= $self->{max_ver} and ____rowver____ > $last_ver)";
		if (defined($self->{this_post_ver})) {
			$sql = $sql . " and (____rowver____ <> $post_ver)";
		}
	} else {
		$sql = $sql." where (____rowver____ <= $self->{max_ver})";
	}
	$sql = $sql." and $whereclause" if $whereclause;
	
	my $sth = $self->{DBH}->prepare($sql) || return 'Failed to get prepare SQL for updates: '.$self->{DBH}->errstr;
	$sth->execute || return 'Failed to execute SQL for updates: '.$self->{DBH}->errstr;
	my @row;
	while (@row = $sth->fetchrow_array) {
		$self->writeclient('update/insert',@row);
	}

	$sth->finish;

	# now get deleted rows
	if ($sync) {
		$sql = "select rowid from ____deleted____ where (tablename = $qtable)";
		$sql = $sql." and (rowver <= $self->{max_ver} and rowver > $last_ver)";
		if (defined($self->{this_post_ver})) {
			$sql = $sql . " and (rowver <> $self->{this_post_ver})";
		}
		$sql = $sql." and $whereclause" if $whereclause;

		$sth = $self->{DBH}->prepare($sql) || return 'Failed to get prepare SQL for deletes: '.$self->{DBH}->errstr;
		$sth->execute || return 'Failed to execute SQL for deletes: '.$self->{DBH}->errstr;
		my @row;
		while (@row = $sth->fetchrow_array) {
			$self->writeclient('delete',@row);
		}

		$sth->finish;
	}

	if ($sync) {
		$self->writeclient('end synchronize', $pub);
	} else {
		$self->writeclient('end refresh', $pub);
	}

	my $qpub = $self->{DBH}->quote($pub);
	my $quser = $self->{DBH}->quote($self->{user});
	my $qnode = $self->{DBH}->quote($self->{node});

	$self->{DBH}->do("update ____subscribed____ set last_ver = $self->{max_ver}, last_session = now(), post_ver = $self->{this_post_ver} where username = $quser and nodename = $qnode and pubname = $qpub");
	return undef;
}


# Call this once when everything else is done.  Does housekeeping. 
# (MAKE THIS AN OBJECT DESTRUCTOR?)
sub DESTROY {
	my $self = shift;

#release version from lock table (including old ones)
	my $quser = $self->{DBH}->quote($self->{user});
	my $qnode = $self->{DBH}->quote($self->{node});
	my $sql = "delete from ____last_stable____ where username = $quser and nodename = $qnode";
	$self->{DBH}->do($sql);

#clean up deleted table
	my ($version) = $self->GetOneRow("select min(last_ver) from ____subscribed____");
	return undef if not defined $version;
	$self->{DBH}->do("delete from ____deleted____ where rowver < $version") || return 'Failed to prune deleted table'.$self->{DBH}->errstr;;


#disconnect from DBD sessions
	$self->{DBH}->disconnect;
	$self->{DBLOG}->disconnect;
	return undef;
}

############# Helper Subs ############
sub GetColList {
	my $self = shift;
	my $sql = shift || die 'Must provide sql select statement';
	my $sth = $self->{DBH}->prepare($sql) || return undef;
	$sth->execute || return undef;
	my $val;
	my @col;
	while (($val) = $sth->fetchrow_array) {
		push @col, $val;
	}
	$sth->finish;
	return @col;
}

sub GetOneRow {
	my $self = shift;
	my $sql = shift || die 'Must provide sql select statement';
	my $sth = $self->{DBH}->prepare($sql) || return undef;
	$sth->execute || return undef;
	my @row = $sth->fetchrow_array;
	$sth->finish;
	return @row;
}

 



package SyncManager;

use DBI;
# new requires 3 arguments: dbi connection string, plus the corresponding username and password

sub new {
	my $proto = shift;
	my $class = ref($proto) || $proto;
	my $self = {};

	my $dbi = shift;
	my $user = shift;
	my $pass = shift;

	$self->{DBH} = DBI->connect($dbi,$user,$pass) || die "Failed to connect to database: ".DBI->errstr();

	$self->{DBLOG}= DBI->connect($dbi,$user,$pass) || die "cannot log to DB: ".DBI->errstr();
	
	return bless ($self, $class);
}

sub dblog { 
	my $self = shift;
	my $msg = $self->{DBLOG}->quote($_[0]);
	my $quser = $self->{DBH}->quote($self->{user});
	my $qnode = $self->{DBH}->quote($self->{node});
	$self->{DBLOG}->do("insert into ____sync_log____ (username, nodename,stamp, message) values($quser, $qnode, now(), $msg)");
}

#this should never need to be called, but it might if a node bails without releasing their locks
sub ReleaseAllLocks {
	my $self = shift;
	$self->{DBH}->do("delete from ____last_stable____)");
}
# Adds a publication to the system.  Also adds triggers, sequences, etc associated with the table if approproate.
	# accepts two argument: the name of a physical table and the name under which to publish it 
	# 	NOTE: the publication name is optional and will default to the table name if not supplied
	# returns undef if ok, else error string;
sub publish {
	my $self = shift;
	my $table = shift || die 'You must provide a table name (and optionally a unique publication name)';
	my $pub = shift;
	$pub = $table if not defined($pub);

	my $qpub = $self->{DBH}->quote($pub);
	my $sql = "select tablename from ____publications____ where pubname = $qpub";
	my ($junk) = $self->GetOneRow($sql);
	return 'Publication already exists' if defined($junk);

	my $qtable = $self->{DBH}->quote($table);

	$sql = "select table_id, refcount from ____tables____ where tablename = $qtable";
	my ($id, $refcount) = $self->GetOneRow($sql);

	if(!defined($id)) {
		$self->{DBH}->do("insert into ____tables____ (tablename, refcount) values ($qtable,1)") || return 'Failed to register table: ' . $self->{DBH}->errstr;
		my $sql = "select table_id from ____tables____ where tablename = $qtable";
		($id) = $self->GetOneRow($sql);
	}

	if (defined($refcount)) {
		$self->{DBH}->do("update ____tables____ set refcount = refcount+1 where table_id = $id") || return 'Failed to update refrence count: ' . $self->{DBH}->errstr;
	} else {
		
		$id = '_'.$id.'_'; 

		my @cols = $self->GetTableCols($table, 1); # 1 = get hidden cols too
		my %skip;
		foreach my $col (@cols) {
			$skip{$col} = 1;
		}
		
		if (!$skip{____rowver____}) {
			$self->{DBH}->do("alter table $table add column ____rowver____ int4"); #don't fail here in case table is being republished, just accept the error silently
		}
		$self->{DBH}->do("update $table set ____rowver____ = ____version_seq____.last_value - 1") || return 'Failed to initialize rowver: ' . $self->{DBH}->errstr;

		if (!$skip{____rowid____}) {
			$self->{DBH}->do("alter table $table add column ____rowid____ int4"); #don't fail here in case table is being republished, just accept the error silently
		}

		my $index = $id.'____rowid____idx';
		$self->{DBH}->do("create index $index on $table(____rowid____)") || return 'Failed to create rowid index: ' . $self->{DBH}->errstr;

		my $sequence = $id.'_rowid_seq';
		$self->{DBH}->do("create sequence $sequence") || return 'Failed to create rowver sequence: ' . $self->{DBH}->errstr;

		$self->{DBH}->do("alter table $table alter column ____rowid____ set default nextval('$sequence')"); #don't fail here in case table is being republished, just accept the error silently

		$self->{DBH}->do("update $table set ____rowid____ =  nextval('$sequence')") || return 'Failed to initialize rowid: ' . $self->{DBH}->errstr;

		if (!$skip{____stamp____}) {
			$self->{DBH}->do("alter table $table add column ____stamp____ timestamp"); #don't fail here in case table is being republished, just accept the error silently
		}

		$self->{DBH}->do("update $table set ____stamp____ =  now()") || return 'Failed to initialize stamp: ' . $self->{DBH}->errstr;

		my $trigger = $id.'_ver_ins';
		$self->{DBH}->do("create trigger $trigger before insert on $table for each row execute procedure sync_insert_ver()") || return 'Failed to create trigger: ' . $self->{DBH}->errstr;

		my $trigger = $id.'_ver_upd';
		$self->{DBH}->do("create trigger $trigger before update on $table for each row execute procedure sync_update_ver()") || return 'Failed to create trigger: ' . $self->{DBH}->errstr;

		my $trigger = $id.'_del_row';
		$self->{DBH}->do("create trigger $trigger after delete on $table for each row execute procedure sync_delete_row()") || return 'Failed to create trigger: ' . $self->{DBH}->errstr;
	}

	$self->{DBH}->do("insert into ____publications____ (pubname, tablename) values ('$pub','$table')") || return 'Failed to create publication entry: '.$self->{DBH}->errstr;

	return undef;
}


# Removes a publication from the system.  Also drops triggers, sequences, etc associated with the table if approproate.
	# accepts one argument: the name of a publication
	# returns undef if ok, else error string;
sub unpublish {
	my $self = shift;
	my $pub = shift || return 'You must provide a publication name';
	my $qpub = $self->{DBH}->quote($pub);
	my $sql = "select tablename from ____publications____ where pubname = $qpub";
	my ($table) = $self->GetOneRow($sql);
	return 'Publication does not exist' if !defined($table);

	my $qtable = $self->{DBH}->quote($table);

	$sql = "select table_id, refcount from ____tables____ where tablename = $qtable";
	my ($id, $refcount) = $self->GetOneRow($sql);
	return 'Table: $table is not correctly registered!' if not defined($id);

	$self->{DBH}->do("update ____tables____ set refcount = refcount -1 where tablename = $qtable") || return 'Failed to decrement reference count: ' . $self->{DBH}->errstr;

	$self->{DBH}->do("delete from ____subscribed____ where pubname = $qpub") || return 'Failed to delete user subscriptions: ' . $self->{DBH}->errstr;
	$self->{DBH}->do("delete from ____subscribed_cols____ where pubname = $qpub") || return 'Failed to delete subscribed columns: ' . $self->{DBH}->errstr;
	$self->{DBH}->do("delete from ____publications____ where tablename = $qtable and pubname = $qpub") || return 'Failed to delete from publications: ' . $self->{DBH}->errstr;

	#if this is the last reference, we want to drop triggers, etc;
	if ($refcount <= 1) {
		$id = "_".$id."_";

		$self->{DBH}->do("alter table $table alter column ____rowver____ drop default") || return 'Failed to alter column default: ' . $self->{DBH}->errstr;
		$self->{DBH}->do("alter table $table alter column ____rowid____ drop default") || return 'Failed to alter column default: ' . $self->{DBH}->errstr;
		$self->{DBH}->do("alter table $table alter column ____stamp____ drop default") || return 'Failed to alter column default: ' . $self->{DBH}->errstr;

		my $trigger = $id.'_ver_upd';
		$self->{DBH}->do("drop trigger $trigger on $table") || return 'Failed to drop trigger: ' . $self->{DBH}->errstr;

		my $trigger = $id.'_ver_ins';
		$self->{DBH}->do("drop trigger $trigger on $table") || return 'Failed to drop trigger: ' . $self->{DBH}->errstr;

		my $trigger = $id.'_del_row';
		$self->{DBH}->do("drop trigger $trigger on $table") || return 'Failed to drop trigger: ' . $self->{DBH}->errstr;

		my $sequence = $id.'_rowid_seq';
		$self->{DBH}->do("drop sequence $sequence") || return 'Failed to drop sequence: ' . $self->{DBH}->errstr;

		my $index = $id.'____rowid____idx';
		$self->{DBH}->do("drop index $index") || return 'Failed to drop index: ' . $self->{DBH}->errstr;
		$self->{DBH}->do("delete from ____tables____ where tablename = $qtable") || return 'remove entry from tables: ' . $self->{DBH}->errstr;
	}
return undef;
}





#Subscribe user/node to a publication
	# Accepts 3 arguements: Username, Nodename, Publication
	# 	NOTE: the remaining arguments can be supplied as column names to which the user/node should be subscribed
	# Return undef if ok, else returns an error string

sub subscribe {
	my $self = shift;
	my $user = shift || die 'You must provide user, node and publication as arguments';
	my $node = shift || die 'You must provide user, node and publication as arguments';
	my $pub = shift || die 'You must provide user, node and publication as arguments';
	my @cols = @_;

	my $quser = $self->{DBH}->quote($user);
	my $qnode = $self->{DBH}->quote($node);
	my $qpub = $self->{DBH}->quote($pub);

	my $sql = "select tablename from ____publications____ where pubname = $qpub";
	my ($table) = $self->GetOneRow($sql);
	return "Publication $pub does not exist." if not defined $table;
	my $qtable = $self->{DBH}->quote($table);

	@cols = $self->GetTableCols($table) if !@cols; # get defaults if cols were not spefified by caller

	$self->{DBH}->do("insert into ____subscribed____ (username, nodename,pubname,last_ver,refreshonce) values('$user', '$node','$pub',0, true)") || return 'Failes to create subscription: ' . $self->{DBH}->errstr;	

	foreach my $col (@cols) {
		$self->{DBH}->do("insert into ____subscribed_cols____ (username, nodename, pubname, col_name) values ('$user','$node','$pub','$col')") || return 'Failes to subscribe column: ' . $self->{DBH}->errstr;	
	}

	return undef;
}


#Unsubscribe user/node to a publication
	# Accepts 3 arguements: Username, Nodename, Publication
	# Return undef if ok, else returns an error string

sub unsubscribe {
	my $self = shift;
	my $user = shift || die 'You must provide user, node and publication as arguments';
	my $node = shift || die 'You must provide user, node and publication as arguments';
	my $pub = shift || die 'You must provide user, node and publication as arguments';
	my @cols = @_;

	my $quser = $self->{DBH}->quote($user);
	my $qnode = $self->{DBH}->quote($node);
	my $qpub = $self->{DBH}->quote($pub);

	my $sql = "select tablename from ____publications____ where pubname = $qpub";
	my $table = $self->GetOneRow($sql);
	return "Publication $pub does not exist." if not defined $table;

	$self->{DBH}->do("delete from ____subscribed_cols____ where pubname = $qpub and username = $quser and nodename = $qnode") || return 'Failed to remove column subscription: '. $self->{DBH}->errstr;
	$self->{DBH}->do("delete from ____subscribed____ where pubname = $qpub and username = $quser and nodename = $qnode") || return 'Failed to remove subscription: '. $self->{DBH}->errstr;


	return undef;
}



#INSTALL creates the necessary management tables.  
	#returns undef if everything is ok, else returns a string describing the error;
sub INSTALL {
my $self = shift;

#check to see if management tables are already installed

my ($test) = $self->GetOneRow("select * from pg_class where relname = '____publications____'");
if (defined($test)) {
	return 'It appears that synchronization manangement tables are already installed here.  Please uninstall before reinstalling.';
};



#install the management tables, etc.

$self->{DBH}->do("create table ____publications____ (pubname text primary key,description text, tablename text, sync_order int4, whereclause text)") || return $self->{DBH}->errstr();

$self->{DBH}->do("create table ____subscribed_cols____ (nodename text, username text, pubname text, col_name text, description text, primary key(nodename, username, pubname,col_name))") || return $self->{DBH}->errstr();

$self->{DBH}->do("create table ____subscribed____ (nodename text, username text, pubname text, last_session timestamp, post_ver int4, last_ver int4, whereclause text, sanity_limit int4 default 0, sanity_delete int4 default 0, sanity_update int4 default 0, sanity_insert int4 default 50, readonly boolean, disabled boolean, fullrefreshonly boolean, refreshonce boolean, primary key(nodename, username, pubname))") || return $self->{DBH}->errstr();

$self->{DBH}->do("create table ____last_stable____ (version int4, username text, nodename text, primary key(version, username, nodename))") || return $self->{DBH}->errstr();

$self->{DBH}->do("create table ____tables____ (tablename text, table_id int4, refcount int4, primary key(tablename, table_id))") || return $self->{DBH}->errstr();

$self->{DBH}->do("create sequence ____table_id_seq____") || return $self->{DBH}->errstr();

$self->{DBH}->do("alter table ____tables____ alter column table_id set default nextval('____table_id_seq____')") || return $self->{DBH}->errstr();

$self->{DBH}->do("create table ____deleted____ (rowid int4, tablename text, rowver int4, stamp timestamp, primary key (rowid, tablename))") || return $self->{DBH}->errstr();

$self->{DBH}->do("create table ____collision____ (rowid text, tablename text, rowver int4, stamp timestamp, faildate timestamp default now(),data text,reason text, action text, username text, nodename text,queue text)") || return $self->{DBH}->errstr();

$self->{DBH}->do("create sequence ____version_seq____") || return $self->{DBH}->errstr();

$self->{DBH}->do("create table ____sync_log____ (username text, nodename text, stamp timestamp, message text)") || return $self->{DBH}->errstr();

$self->{DBH}->do("create function sync_insert_ver() returns opaque as
'begin
if new.____rowver____ isnull then
new.____rowver____ := ____version_seq____.last_value;
end if;
if new.____stamp____ isnull then
new.____stamp____ := now();
end if;
return NEW;
end;' language 'plpgsql'") || return $self->{DBH}->errstr();

$self->{DBH}->do("create function sync_update_ver() returns opaque as
'begin
if new.____rowver____ = old.____rowver____ then
new.____rowver____ := ____version_seq____.last_value;
end if;
if new.____stamp____ = old.____stamp____ then
new.____stamp____ := now();
end if;
return NEW;
end;' language 'plpgsql'") || return $self->{DBH}->errstr();


$self->{DBH}->do("create function sync_delete_row() returns opaque as 
'begin 
insert into ____deleted____ (rowid,tablename,rowver,stamp) values
(old.____rowid____, TG_RELNAME, old.____rowver____,old.____stamp____); 
return old; 
end;' language 'plpgsql'") || return $self->{DBH}->errstr();

return undef;
}

#removes all management tables & related stuff
	#returns undef if ok, else returns an error message as a string
sub UNINSTALL {
my $self = shift;

#Make sure all tables are unpublished first
my $sth = $self->{DBH}->prepare("select pubname from ____publications____");
$sth->execute;
my $pub;
while (($pub) = $sth->fetchrow_array) {
	$self->unpublish($pub);	
}
$sth->finish;

$self->{DBH}->do("drop table ____publications____") || return $self->{DBH}->errstr();
$self->{DBH}->do("drop table ____subscribed_cols____") || return $self->{DBH}->errstr();
$self->{DBH}->do("drop table ____subscribed____") || return $self->{DBH}->errstr();
$self->{DBH}->do("drop table ____last_stable____") || return $self->{DBH}->errstr();
$self->{DBH}->do("drop table ____deleted____") || return $self->{DBH}->errstr();
$self->{DBH}->do("drop table ____collision____") || return $self->{DBH}->errstr();
$self->{DBH}->do("drop table ____tables____") || return $self->{DBH}->errstr();
$self->{DBH}->do("drop table ____sync_log____") || return $self->{DBH}->errstr();

$self->{DBH}->do("drop sequence ____table_id_seq____") || return $self->{DBH}->errstr();
$self->{DBH}->do("drop sequence ____version_seq____") || return $self->{DBH}->errstr();

$self->{DBH}->do("drop function sync_insert_ver()") || return $self->{DBH}->errstr();
$self->{DBH}->do("drop function sync_update_ver()") || return $self->{DBH}->errstr();
$self->{DBH}->do("drop function sync_delete_row()") || return $self->{DBH}->errstr();

return undef;

}

sub DESTROY {
	my $self = shift;

	$self->{DBH}->disconnect;
	$self->{DBLOG}->disconnect;
	return undef;
}

############# Helper Subs ############

sub GetOneRow {
	my $self = shift;
	my $sql = shift || die 'Must provide sql select statement';
	my $sth = $self->{DBH}->prepare($sql) || return undef;
	$sth->execute || return undef;
	my @row = $sth->fetchrow_array;
	$sth->finish;
	return @row;
}

#call this with second non-zero value to get hidden columns
sub GetTableCols {
	my $self = shift;
	my $table = shift || die 'Must provide table name';
	my $wanthidden = shift;
	my $sql = "select * from $table where 0 = 1";
	my $sth = $self->{DBH}->prepare($sql) || return undef;
	$sth->execute || return undef;
	my @row = @{$sth->{NAME}};
	$sth->finish;
	return @row if $wanthidden;
	my @cols;
	foreach my $col (@row) {
		next if $col eq '____rowver____';
		next if $col eq '____stamp____';
		next if $col eq '____rowid____';
		push @cols, $col;	
	}
	return @cols;
}


1; #happy require
