Re: Implementing queue semantics (novice) - Mailing list pgsql-sql
From | Andrew Hammond |
---|---|
Subject | Re: Implementing queue semantics (novice) |
Date | |
Msg-id | 41E54DE5.9000505@ca.afilias.info Whole thread Raw |
In response to | Implementing queue semantics (novice) (KÖPFERL Robert <robert.koepferl@sonorys.at>) |
List | pgsql-sql |
-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1 The name for what you're looking to build is a concurrent batch processing system. Here's a basic one. - -- adding processes BEGIN; INSERT INTO queue (queue_id, processing_pid, processing_start, ~ processing_status, foreign_id) VALUES (DEFAULT, NULL, NULL, ~ (SELECT queue_status_id FROM queue_status WHERE name = 'pending'), ~ foreign_id); COMMIT; - -- removing processes BEGIN; SELECT queue_id, foreign_id FROM queue WHERE processing_status = (SELECT queue_status_id FROM queue_status ~ WHERE name = 'pending') ORDER BY queue_id LIMIT 1 FOR UPDATE; UPDATE queue SET processing_pid = ?, ~ processing_start = now(), ~ processing_status = (SELECT queue_status_id FROM queue_status WHERE ~ name = 'active') WHERE id = ?; COMMIT; - -- client code does whatever it's going to do here BEGIN; SELECT 1 FROM queue WHERE queue_id = ? AND processing_pid = ? FOR UPDATE; - -- confirm that it exists DELETE FROM queue WHERE queue_id = ? INSERT INTO queue_history (queue_id, processing_pid, processing_start, ~ processing_complete, processing_status, foreign_id) VALUES (queue_id, processing_pid, processing_start, now(), ~ (SELECT queue_status_id FROM queue_status WHERE name = 'done'), ~ foreign_id); COMMIT; - -- a seperate process reaps orphaned entries should processing fail. BEGIN; SELECT queue_id, processing_pid FROM queue WHERE now() - processing_start > 'some reasonable interval'::interval AND processing_status = (SELECT queue_status_id FROM queue_status WHERE ~ name = 'active' FOR UPDATE; - -- for each entry, check to see if the PID is still running UPDATE queue SET ~ processing_pid = NULL, ~ processing_start = NULL, ~ processing_status = (SELECT id FROM queue_status WHERE name = 'pending') WHERE id = ?; COMMIT; There are more complicated approaches available. If you plan to have multiple machines processing, you probably want to add a processing_node entry too. KÖPFERL Robert wrote: | Hi, | | since I am new to writing stored procedures I'd like to ask first bevore I | do a mistake. | | I want to implement some kind of queue (fifo). There are n users/processes | that add new records to a table and there are m consumers that take out | these records and process them. | It's however possible for a consumer to die or loose connection while | records must not be unprocessed. They may rather be processed twice. | | This seems to me as a rather common problem. But also with atomicy-holes to | fall into. | How is this commonly implemented? | | | I can imagine an 'add' and a 'get' function together with one aditional | 'processed' timestamp-column? | | | | Thanks for helping me do the right. | | ---------------------------(end of broadcast)--------------------------- | TIP 4: Don't 'kill -9' the postmaster - -- Andrew Hammond 416-673-4138 ahammond@ca.afilias.info Database Administrator, Afilias Canada Corp. CB83 2838 4B67 D40F D086 3568 81FC E7E5 27AF 4A9A -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.2.5 (GNU/Linux) iD8DBQFB5U3kgfzn5SevSpoRAoesAKCAZkr61I5knCw9tIr8rlO0xri7YACgifrn N01nXZY8UKmIlTnGkngHKUo= =UXRk -----END PGP SIGNATURE-----