Re: Deadlock detection - Mailing list pgsql-jdbc
From | Oliver Jowett |
---|---|
Subject | Re: Deadlock detection |
Date | |
Msg-id | 4977C8FD.7040204@opencloud.com Whole thread Raw |
In response to | Re: Deadlock detection (Oliver Jowett <oliver@opencloud.com>) |
Responses |
Re: Deadlock detection
|
List | pgsql-jdbc |
Oliver Jowett wrote: > I have a bit of time spare today, I might look at putting together that > OutputStream wrapper. Try this. I have not tested at all - it compiles but that's as far as I got - but it should give you an idea of what I had in mind. -O ? org/postgresql/core/AntiDeadlockStream.java Index: org/postgresql/core/PGStream.java =================================================================== RCS file: /cvsroot/jdbc/pgjdbc/org/postgresql/core/PGStream.java,v retrieving revision 1.22 diff -u -r1.22 PGStream.java --- org/postgresql/core/PGStream.java 8 Jan 2008 06:56:27 -0000 1.22 +++ org/postgresql/core/PGStream.java 22 Jan 2009 01:10:13 -0000 @@ -34,6 +34,7 @@ { private final String host; private final int port; + private final boolean antiDeadlock; private final byte[] _int4buf; private final byte[] _int2buf; @@ -52,12 +53,14 @@ * * @param host the hostname to connect to * @param port the port number that the postmaster is sitting on + * @param antiDeadlock true to insert an anti-deadlock outputstream * @exception IOException if an IOException occurs below it. */ - public PGStream(String host, int port) throws IOException + public PGStream(String host, int port, boolean antiDeadlock) throws IOException { this.host = host; this.port = port; + this.antiDeadlock = antiDeadlock; changeSocket(new Socket(host, port)); setEncoding(Encoding.getJVMEncoding("US-ASCII")); @@ -74,6 +77,10 @@ return port; } + public boolean getAntiDeadlock() { + return antiDeadlock; + } + public Socket getSocket() { return connection; } @@ -110,6 +117,8 @@ // Buffer sizes submitted by Sverre H Huseby <sverrehu@online.no> pg_input = new VisibleBufferedInputStream(connection.getInputStream(), 8192); pg_output = new BufferedOutputStream(connection.getOutputStream(), 8192); + if (antiDeadlock) + pg_output = new AntiDeadlockStream(pg_output, 8192, 30000); if (encoding != null) setEncoding(encoding); Index: org/postgresql/core/v2/ConnectionFactoryImpl.java =================================================================== RCS file: /cvsroot/jdbc/pgjdbc/org/postgresql/core/v2/ConnectionFactoryImpl.java,v retrieving revision 1.17 diff -u -r1.17 ConnectionFactoryImpl.java --- org/postgresql/core/v2/ConnectionFactoryImpl.java 30 Sep 2008 03:42:48 -0000 1.17 +++ org/postgresql/core/v2/ConnectionFactoryImpl.java 22 Jan 2009 01:10:14 -0000 @@ -59,7 +59,7 @@ PGStream newStream = null; try { - newStream = new PGStream(host, port); + newStream = new PGStream(host, port, Boolean.valueOf(info.getProperty("antiDeadlock")).booleanValue()); // Construct and send an ssl startup packet if requested. if (trySSL) @@ -147,7 +147,7 @@ // We have to reconnect to continue. pgStream.close(); - return new PGStream(pgStream.getHost(), pgStream.getPort()); + return new PGStream(pgStream.getHost(), pgStream.getPort(), pgStream.getAntiDeadlock()); case 'N': if (logger.logDebug()) Index: org/postgresql/core/v2/ProtocolConnectionImpl.java =================================================================== RCS file: /cvsroot/jdbc/pgjdbc/org/postgresql/core/v2/ProtocolConnectionImpl.java,v retrieving revision 1.12 diff -u -r1.12 ProtocolConnectionImpl.java --- org/postgresql/core/v2/ProtocolConnectionImpl.java 1 Apr 2008 07:19:20 -0000 1.12 +++ org/postgresql/core/v2/ProtocolConnectionImpl.java 22 Jan 2009 01:10:14 -0000 @@ -90,7 +90,7 @@ if (logger.logDebug()) logger.debug(" FE=> CancelRequest(pid=" + cancelPid + ",ckey=" + cancelKey + ")"); - cancelStream = new PGStream(pgStream.getHost(), pgStream.getPort()); + cancelStream = new PGStream(pgStream.getHost(), pgStream.getPort(), false); cancelStream.SendInteger4(16); cancelStream.SendInteger2(1234); cancelStream.SendInteger2(5678); Index: org/postgresql/core/v3/ConnectionFactoryImpl.java =================================================================== RCS file: /cvsroot/jdbc/pgjdbc/org/postgresql/core/v3/ConnectionFactoryImpl.java,v retrieving revision 1.19 diff -u -r1.19 ConnectionFactoryImpl.java --- org/postgresql/core/v3/ConnectionFactoryImpl.java 29 Nov 2008 07:40:30 -0000 1.19 +++ org/postgresql/core/v3/ConnectionFactoryImpl.java 22 Jan 2009 01:10:14 -0000 @@ -73,7 +73,7 @@ PGStream newStream = null; try { - newStream = new PGStream(host, port); + newStream = new PGStream(host, port, Boolean.valueOf(info.getProperty("antiDeadlock")).booleanValue()); // Construct and send an ssl startup packet if requested. if (trySSL) @@ -178,7 +178,7 @@ // We have to reconnect to continue. pgStream.close(); - return new PGStream(pgStream.getHost(), pgStream.getPort()); + return new PGStream(pgStream.getHost(), pgStream.getPort(), pgStream.getAntiDeadlock()); case 'N': if (logger.logDebug()) Index: org/postgresql/core/v3/ProtocolConnectionImpl.java =================================================================== RCS file: /cvsroot/jdbc/pgjdbc/org/postgresql/core/v3/ProtocolConnectionImpl.java,v retrieving revision 1.13 diff -u -r1.13 ProtocolConnectionImpl.java --- org/postgresql/core/v3/ProtocolConnectionImpl.java 1 Apr 2008 07:19:20 -0000 1.13 +++ org/postgresql/core/v3/ProtocolConnectionImpl.java 22 Jan 2009 01:10:14 -0000 @@ -90,7 +90,7 @@ if (logger.logDebug()) logger.debug(" FE=> CancelRequest(pid=" + cancelPid + ",ckey=" + cancelKey + ")"); - cancelStream = new PGStream(pgStream.getHost(), pgStream.getPort()); + cancelStream = new PGStream(pgStream.getHost(), pgStream.getPort(), false); cancelStream.SendInteger4(16); cancelStream.SendInteger2(1234); cancelStream.SendInteger2(5678); package org.postgresql.core; import java.io.*; /** * Temporary hack to try to detect/avoid socket deadlocks caused * by blocking on write while we have lots of pending data to read * from the server (i.e. the server is also blocked on write). * * see the thread at http://archives.postgresql.org/pgsql-jdbc/2009-01/msg00045.php * * @author Oliver Jowett <oliver@opencloud.com> */ class AntiDeadlockStream extends OutputStream implements Runnable { private static final class BufferLock {} private final BufferLock bufferLock = new BufferLock(); private final OutputStream wrapped; private final long flushTimeout; private byte[] buffer; private int bufferSize; private byte[] swapBuffer; private boolean closeRequest; private boolean flushRequest; private boolean closeComplete; private IOException failedException; AntiDeadlockStream(OutputStream wrapped, int initialSize, long flushTimeout) { this.wrapped = wrapped; this.flushTimeout = flushTimeout; this.buffer = new byte[initialSize]; this.swapBuffer = new byte[initialSize]; new Thread(this, "AntiDeadlock thread").start(); } public void close() throws IOException { synchronized (bufferLock) { closeRequest = true; bufferLock.notifyAll(); while (!closeComplete) { if (failedException != null) throw (IOException) (new IOException("Write thread reported an error").initCause(failedException)); try { bufferLock.wait(); } catch (InterruptedException ie) { throw new InterruptedIOException(); } } } } public void flush() throws IOException { synchronized (bufferLock) { long expiry = -1; flushRequest = true; bufferLock.notifyAll(); while (true) { if (failedException != null) throw (IOException) (new IOException("Write thread reported an error").initCause(failedException)); if (closeRequest) throw new IOException("Stream is closed"); if (bufferSize == 0) return; long delay; if (expiry == -1) { delay = flushTimeout; expiry = System.currentTimeMillis() + delay; } else { delay = expiry - System.currentTimeMillis(); } if (delay <= 0) { System.err.println("Warning: possible socket deadlock detected (timeout=" + flushTimeout + ", remainingbuffer=" + bufferSize); new Throwable("Deadlock call stack").fillInStackTrace().printStackTrace(System.err); return; } try { bufferLock.wait(delay); } catch (InterruptedException ie) { throw new InterruptedIOException(); } } } } public void write(int b) throws IOException { write(new byte[] { (byte)b }, 0, 1); } public void write(byte[] b) throws IOException { write(b, 0, b.length); } public void write(byte[] b, int off, int len) throws IOException { if (b == null) throw new NullPointerException(); if (off < 0 || len < 0 || off+len > b.length) throw new IndexOutOfBoundsException(); synchronized (bufferLock) { if (closeRequest) throw new IOException("Stream is closed"); if (failedException != null) throw (IOException) (new IOException("Write thread reported an error").initCause(failedException)); int needs = bufferSize + len; int newSize = buffer.length; while (newSize < needs) newSize *= 2; if (newSize != buffer.length) { byte[] newBuffer = new byte[newSize]; System.arraycopy(buffer, 0, newBuffer, 0, bufferSize); buffer = newBuffer; } if (bufferSize == 0) bufferLock.notifyAll(); System.arraycopy(b, off, buffer, bufferSize, len); bufferSize += len; } } // // Runnable // public void run() { while (true) { boolean doFlush; boolean doClose; int writeLength; synchronized (bufferLock) { if (bufferSize == 0 && !closeRequest && !flushRequest) { try { bufferLock.wait(); } catch (InterruptedException ie) { failedException = new InterruptedIOException("write thread interrupted"); bufferLock.notifyAll(); return; } continue; } byte[] oldBuffer = buffer; buffer = swapBuffer; swapBuffer = buffer; writeLength = bufferSize; doFlush = flushRequest; doClose = closeRequest; flushRequest = false; bufferLock.notifyAll(); } try { if (writeLength > 0) wrapped.write(swapBuffer, 0, writeLength); if (flushRequest) wrapped.flush(); if (closeRequest) { wrapped.close(); synchronized (bufferLock) { closeComplete = true; bufferLock.notifyAll(); } return; } } catch (IOException ioe) { synchronized (bufferLock) { failedException = ioe; bufferLock.notifyAll(); try { wrapped.close(); } catch (IOException ioe2) { // Ignore it. } return; } } } } }
pgsql-jdbc by date: