Thread: Using Skytools PGQ for targeted copying of data in cluster
In a clustered database where some data on one node is shared with some other nodes there is a need to provide targeted replication on row level. It is not possible with Slon replication for replication copies all data. Using PGQ in a star schema where all nodes are interconnected would require <number of nodes>^2 processes to run (high number for even a 256-node cluster).
Solution: Push-Pull queue schema using pgq.RemoteConsumer to send copy messages of object data.
There are 2 ticker processes (push_queue and pull_queue) and two consumer processes per node making total number of processes <number of nodes>X2+2.
On push queue side sending node inserts one event per target node:
pgq.insert_event('push_queue','copy', '', <node number>,'action=ins&id=1&name=test', '' ,'' );
Push queue consumer calls function to relay same event into pull queue
pgq.insert_event('pull_queue','copy', '', <node number>,'action=ins&id=1&name=test', '' ,'' )
Pull queue consumer before issuing "ev.done" checks that the node it is running on is the event's intended recipient. It ensures against data loss should a node become unresponsive as failed event is kept in queue until it reconnects.
## pull.py
import sys, os, pgq, skytools
import psycopg2
class Copier(pgq.RemoteConsumer):
def __init__(self, args):
pgq.RemoteConsumer.__init__(self, "pull_ticker", "src_db", "dst_db", args)
### GET "node" SETTING FROM .ini
### MUST ADD "node = <node number>" TO .ini
self.node = self.cf.get("node")
def process_remote_batch(self, db, batch_id, event_list, dst_db):
for ev in event_list:
if ev.type <> 'copy':
ev.tag_done()
### CHECK TARGET IS CURRENT NODE
if self.node == ev.ev_extra1:
### COMMIT EVENT IF IT IS PROCESSING ON TARGET SERVER
ev.tag_done()
cur = dst_db.cursor()
cur.execute("select __data_copier('%s', '%s')" % (ev.ev_extra1, ev.ev_extra2))
records = cur.fetchall()
dst_db.commit()
if __name__ == '__main__':
script = Copier(sys.argv[1:])
script.start()