summaryrefslogtreecommitdiff
path: root/lib/server
diff options
context:
space:
mode:
authorChris Wilson <chris+github@qwirx.com>2006-10-16 23:17:02 +0000
committerChris Wilson <chris+github@qwirx.com>2006-10-16 23:17:02 +0000
commit9513d59b275068b8376554561e67b7664cf96b6e (patch)
tree82b144ec4ae897030b9fc595c1b431fa48701add /lib/server
parent29da720afeebbb7c618355a588a1e36a7620c9a6 (diff)
Use overlapped I/O to avoid blocking.
Use INVALID_HANDLE_VALUE instead of NULL everywhere to avoid bugs, and for type safety. (refs #3)
Diffstat (limited to 'lib/server')
-rw-r--r--lib/server/WinNamedPipeStream.cpp288
-rw-r--r--lib/server/WinNamedPipeStream.h8
2 files changed, 264 insertions, 32 deletions
diff --git a/lib/server/WinNamedPipeStream.cpp b/lib/server/WinNamedPipeStream.cpp
index c5b7eaa5..362f3958 100644
--- a/lib/server/WinNamedPipeStream.cpp
+++ b/lib/server/WinNamedPipeStream.cpp
@@ -35,7 +35,9 @@
//
// --------------------------------------------------------------------------
WinNamedPipeStream::WinNamedPipeStream()
- : mSocketHandle(NULL),
+ : mSocketHandle(INVALID_HANDLE_VALUE),
+ mReadableEvent(INVALID_HANDLE_VALUE),
+ mBytesInBuffer(0),
mReadClosed(false),
mWriteClosed(false),
mIsServer(false),
@@ -53,7 +55,7 @@ WinNamedPipeStream::WinNamedPipeStream()
// --------------------------------------------------------------------------
WinNamedPipeStream::~WinNamedPipeStream()
{
- if (mSocketHandle != NULL)
+ if (mSocketHandle != INVALID_HANDLE_VALUE)
{
Close();
}
@@ -70,14 +72,15 @@ WinNamedPipeStream::~WinNamedPipeStream()
// --------------------------------------------------------------------------
void WinNamedPipeStream::Accept(const wchar_t* pName)
{
- if (mSocketHandle != NULL || mIsConnected)
+ if (mSocketHandle != INVALID_HANDLE_VALUE || mIsConnected)
{
THROW_EXCEPTION(ServerException, SocketAlreadyOpen)
}
mSocketHandle = CreateNamedPipeW(
pName, // pipe name
- PIPE_ACCESS_DUPLEX, // read/write access
+ PIPE_ACCESS_DUPLEX | // read/write access
+ FILE_FLAG_OVERLAPPED, // enabled overlapped I/O
PIPE_TYPE_MESSAGE | // message type pipe
PIPE_READMODE_MESSAGE | // message-read mode
PIPE_WAIT, // blocking mode
@@ -87,7 +90,7 @@ void WinNamedPipeStream::Accept(const wchar_t* pName)
NMPWAIT_USE_DEFAULT_WAIT, // client time-out
NULL); // default security attribute
- if (mSocketHandle == NULL)
+ if (mSocketHandle == INVALID_HANDLE_VALUE)
{
::syslog(LOG_ERR, "CreateNamedPipeW failed: %d",
GetLastError());
@@ -104,10 +107,42 @@ void WinNamedPipeStream::Accept(const wchar_t* pName)
THROW_EXCEPTION(ServerException, SocketOpenError)
}
+ mBytesInBuffer = 0;
mReadClosed = false;
mWriteClosed = false;
mIsServer = true; // must flush and disconnect before closing
mIsConnected = true;
+
+ // create the Readable event
+ mReadableEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+
+ if (mReadableEvent == INVALID_HANDLE_VALUE)
+ {
+ ::syslog(LOG_ERR, "Failed to create the Readable event: "
+ "error %d", GetLastError());
+ Close();
+ THROW_EXCEPTION(CommonException, Internal)
+ }
+
+ // initialise the OVERLAPPED structure
+ memset(&mReadOverlap, 0, sizeof(mReadOverlap));
+ mReadOverlap.hEvent = mReadableEvent;
+
+ // start the first overlapped read
+ if (!ReadFile(mSocketHandle, mReadBuffer, sizeof(mReadBuffer),
+ NULL, &mReadOverlap))
+ {
+ DWORD err = GetLastError();
+
+ if (err != ERROR_IO_PENDING)
+ {
+ ::syslog(LOG_ERR, "Failed to start overlapped read: "
+ "error %d", err);
+ Close();
+ THROW_EXCEPTION(ConnectionException,
+ Conn_SocketReadError)
+ }
+ }
}
// --------------------------------------------------------------------------
@@ -120,7 +155,7 @@ void WinNamedPipeStream::Accept(const wchar_t* pName)
// --------------------------------------------------------------------------
void WinNamedPipeStream::Connect(const wchar_t* pName)
{
- if (mSocketHandle != NULL || mIsConnected)
+ if (mSocketHandle != INVALID_HANDLE_VALUE || mIsConnected)
{
THROW_EXCEPTION(ServerException, SocketAlreadyOpen)
}
@@ -137,8 +172,17 @@ void WinNamedPipeStream::Connect(const wchar_t* pName)
if (mSocketHandle == INVALID_HANDLE_VALUE)
{
- ::syslog(LOG_ERR, "Failed to connect to server's named pipe: "
- "error %d", GetLastError());
+ DWORD err = GetLastError();
+ if (err == ERROR_PIPE_BUSY)
+ {
+ ::syslog(LOG_ERR, "Failed to connect to backup "
+ "daemon: it is busy with another connection");
+ }
+ else
+ {
+ ::syslog(LOG_ERR, "Failed to connect to backup "
+ "daemon: error %d", err);
+ }
THROW_EXCEPTION(ServerException, SocketOpenError)
}
@@ -159,33 +203,171 @@ void WinNamedPipeStream::Connect(const wchar_t* pName)
int WinNamedPipeStream::Read(void *pBuffer, int NBytes, int Timeout)
{
// TODO no support for timeouts yet
- ASSERT(Timeout == IOStream::TimeOutInfinite)
+ if (Timeout != IOStream::TimeOutInfinite)
+ {
+ THROW_EXCEPTION(CommonException, AssertFailed)
+ }
- if (mSocketHandle == NULL || !mIsConnected)
+ if (mSocketHandle == INVALID_HANDLE_VALUE || !mIsConnected)
{
THROW_EXCEPTION(ServerException, BadSocketHandle)
}
+ if (mReadClosed)
+ {
+ THROW_EXCEPTION(ConnectionException, SocketShutdownError)
+ }
+
+ // ensure safe to cast NBytes to unsigned
+ if (NBytes < 0)
+ {
+ THROW_EXCEPTION(CommonException, AssertFailed)
+ }
+
DWORD NumBytesRead;
-
- bool Success = ReadFile(
- mSocketHandle, // pipe handle
- pBuffer, // buffer to receive reply
- NBytes, // size of buffer
- &NumBytesRead, // number of bytes read
- NULL); // not overlapped
-
- if (!Success)
+
+ if (mIsServer)
{
- THROW_EXCEPTION(ConnectionException, Conn_SocketReadError)
+ // satisfy from buffer if possible, to avoid
+ // blocking on read.
+ bool needAnotherRead = false;
+ if (mBytesInBuffer == 0)
+ {
+ // overlapped I/O completed successfully?
+ // (wait if needed)
+
+ if (GetOverlappedResult(mSocketHandle,
+ &mReadOverlap, &NumBytesRead, TRUE))
+ {
+ needAnotherRead = true;
+ }
+ else
+ {
+ DWORD err = GetLastError();
+
+ if (err == ERROR_HANDLE_EOF)
+ {
+ mReadClosed = true;
+ }
+ else
+ {
+ if (err == ERROR_BROKEN_PIPE)
+ {
+ ::syslog(LOG_ERR, "Control "
+ "client disconnected");
+ }
+ else
+ {
+ ::syslog(LOG_ERR,
+ "Failed to wait for "
+ "ReadFile to complete: "
+ "error %d", err);
+ }
+
+ Close();
+ THROW_EXCEPTION(ConnectionException,
+ Conn_SocketReadError)
+ }
+ }
+ }
+ else
+ {
+ NumBytesRead = 0;
+ }
+
+ size_t BytesToCopy = NumBytesRead + mBytesInBuffer;
+ size_t BytesRemaining = 0;
+
+ if (BytesToCopy > (size_t)NBytes)
+ {
+ BytesRemaining = BytesToCopy - NBytes;
+ BytesToCopy = NBytes;
+ }
+
+ memcpy(pBuffer, mReadBuffer, BytesToCopy);
+ memcpy(mReadBuffer, mReadBuffer + BytesToCopy, BytesRemaining);
+
+ mBytesInBuffer = BytesRemaining;
+ NumBytesRead = BytesToCopy;
+
+ // start the next overlapped read
+ if (needAnotherRead && !ReadFile(mSocketHandle,
+ mReadBuffer + mBytesInBuffer,
+ sizeof(mReadBuffer) - mBytesInBuffer,
+ NULL, &mReadOverlap))
+ {
+ DWORD err = GetLastError();
+ if (err == ERROR_IO_PENDING)
+ {
+ // Don't reset yet, there might be data
+ // in the buffer waiting to be read,
+ // will check below.
+ // ResetEvent(mReadableEvent);
+ }
+ else if (err == ERROR_HANDLE_EOF)
+ {
+ mReadClosed = true;
+ }
+ else if (err == ERROR_BROKEN_PIPE)
+ {
+ ::syslog(LOG_ERR,
+ "Control client disconnected");
+ mReadClosed = true;
+ }
+ else
+ {
+ ::syslog(LOG_ERR, "Failed to start "
+ "overlapped read: error %d", err);
+ Close();
+ THROW_EXCEPTION(ConnectionException,
+ Conn_SocketReadError)
+ }
+ }
+
+ // If the read succeeded immediately, leave the event
+ // signaled, so that we will be called again to process
+ // the newly read data and start another overlapped read.
+ if (needAnotherRead && !mReadClosed)
+ {
+ // leave signalled
+ }
+ else if (!needAnotherRead && mBytesInBuffer > 0)
+ {
+ // leave signalled
+ }
+ else
+ {
+ // nothing left to read, reset the event
+ ResetEvent(mReadableEvent);
+ // FIXME: a pending read could have signalled
+ // the event (again) while we were busy reading.
+ // that signal would be lost, and the reading
+ // thread would block. Should be pretty obvious
+ // if this happens in practice: control client
+ // hangs.
+ }
}
-
- // Closed for reading at EOF?
- if (NumBytesRead == 0)
+ else
{
- mReadClosed = true;
+ if (!ReadFile(
+ mSocketHandle, // pipe handle
+ pBuffer, // buffer to receive reply
+ NBytes, // size of buffer
+ &NumBytesRead, // number of bytes read
+ NULL)) // not overlapped
+ {
+ Close();
+ THROW_EXCEPTION(ConnectionException,
+ Conn_SocketReadError)
+ }
+
+ // Closed for reading at EOF?
+ if (NumBytesRead == 0)
+ {
+ mReadClosed = true;
+ }
}
-
+
return NumBytesRead;
}
@@ -199,7 +381,7 @@ int WinNamedPipeStream::Read(void *pBuffer, int NBytes, int Timeout)
// --------------------------------------------------------------------------
void WinNamedPipeStream::Write(const void *pBuffer, int NBytes)
{
- if (mSocketHandle == NULL || !mIsConnected)
+ if (mSocketHandle == INVALID_HANDLE_VALUE || !mIsConnected)
{
THROW_EXCEPTION(ServerException, BadSocketHandle)
}
@@ -223,7 +405,7 @@ void WinNamedPipeStream::Write(const void *pBuffer, int NBytes)
if (!Success)
{
- mWriteClosed = true; // assume can't write again
+ Close();
THROW_EXCEPTION(ConnectionException,
Conn_SocketWriteError)
}
@@ -242,20 +424,39 @@ void WinNamedPipeStream::Write(const void *pBuffer, int NBytes)
// --------------------------------------------------------------------------
void WinNamedPipeStream::Close()
{
- if (mSocketHandle == NULL && mIsConnected)
+ if (mSocketHandle == INVALID_HANDLE_VALUE && mIsConnected)
{
fprintf(stderr, "Inconsistent connected state\n");
::syslog(LOG_ERR, "Inconsistent connected state");
mIsConnected = false;
}
- if (mSocketHandle == NULL)
+ if (mSocketHandle == INVALID_HANDLE_VALUE)
{
THROW_EXCEPTION(ServerException, BadSocketHandle)
}
if (mIsServer)
- {
+ {
+ if (!CancelIo(mSocketHandle))
+ {
+ ::syslog(LOG_ERR, "Failed to cancel outstanding "
+ "I/O: error %d", GetLastError());
+ }
+
+ if (mReadableEvent == INVALID_HANDLE_VALUE)
+ {
+ ::syslog(LOG_ERR, "Failed to destroy Readable "
+ "event: invalid handle");
+ }
+ else if (!CloseHandle(mReadableEvent))
+ {
+ ::syslog(LOG_ERR, "Failed to destroy Readable "
+ "event: error %d", GetLastError());
+ }
+
+ mReadableEvent = INVALID_HANDLE_VALUE;
+
if (!FlushFileBuffers(mSocketHandle))
{
::syslog(LOG_INFO, "FlushFileBuffers failed: %d",
@@ -273,8 +474,10 @@ void WinNamedPipeStream::Close()
bool result = CloseHandle(mSocketHandle);
- mSocketHandle = NULL;
+ mSocketHandle = INVALID_HANDLE_VALUE;
mIsConnected = false;
+ mReadClosed = true;
+ mWriteClosed = true;
if (!result)
{
@@ -309,4 +512,27 @@ bool WinNamedPipeStream::StreamClosed()
return mWriteClosed;
}
+// --------------------------------------------------------------------------
+//
+// Function
+// Name: IOStream::WriteAllBuffered()
+// Purpose: Ensures that any data which has been buffered is written to the stream
+// Created: 2003/08/26
+//
+// --------------------------------------------------------------------------
+void WinNamedPipeStream::WriteAllBuffered()
+{
+ if (mSocketHandle == INVALID_HANDLE_VALUE || !mIsConnected)
+ {
+ THROW_EXCEPTION(ServerException, BadSocketHandle)
+ }
+
+ if (!FlushFileBuffers(mSocketHandle))
+ {
+ ::syslog(LOG_WARNING, "FlushFileBuffers failed: %d",
+ GetLastError());
+ }
+}
+
+
#endif // WIN32
diff --git a/lib/server/WinNamedPipeStream.h b/lib/server/WinNamedPipeStream.h
index 5a800371..aded2d59 100644
--- a/lib/server/WinNamedPipeStream.h
+++ b/lib/server/WinNamedPipeStream.h
@@ -36,13 +36,15 @@ public:
virtual int Read(void *pBuffer, int NBytes,
int Timeout = IOStream::TimeOutInfinite);
virtual void Write(const void *pBuffer, int NBytes);
+ virtual void WriteAllBuffered();
virtual void Close();
virtual bool StreamDataLeft();
virtual bool StreamClosed();
bool IsConnected() { return mIsConnected; }
+ HANDLE GetSocketHandle() { return mSocketHandle; }
+ HANDLE GetReadableEvent() { return mReadableEvent; }
protected:
- HANDLE GetSocketHandle();
void MarkAsReadClosed() {mReadClosed = true;}
void MarkAsWriteClosed() {mWriteClosed = true;}
@@ -51,6 +53,10 @@ private:
{ /* do not call */ }
HANDLE mSocketHandle;
+ HANDLE mReadableEvent;
+ OVERLAPPED mReadOverlap;
+ uint8_t mReadBuffer[4096];
+ size_t mBytesInBuffer;
bool mReadClosed;
bool mWriteClosed;
bool mIsServer;