diff options
author | Chris Wilson <chris+github@qwirx.com> | 2006-10-16 23:17:02 +0000 |
---|---|---|
committer | Chris Wilson <chris+github@qwirx.com> | 2006-10-16 23:17:02 +0000 |
commit | 9513d59b275068b8376554561e67b7664cf96b6e (patch) | |
tree | 82b144ec4ae897030b9fc595c1b431fa48701add /lib/server | |
parent | 29da720afeebbb7c618355a588a1e36a7620c9a6 (diff) |
Use overlapped I/O to avoid blocking.
Use INVALID_HANDLE_VALUE instead of NULL everywhere to avoid bugs, and
for type safety. (refs #3)
Diffstat (limited to 'lib/server')
-rw-r--r-- | lib/server/WinNamedPipeStream.cpp | 288 | ||||
-rw-r--r-- | lib/server/WinNamedPipeStream.h | 8 |
2 files changed, 264 insertions, 32 deletions
diff --git a/lib/server/WinNamedPipeStream.cpp b/lib/server/WinNamedPipeStream.cpp index c5b7eaa5..362f3958 100644 --- a/lib/server/WinNamedPipeStream.cpp +++ b/lib/server/WinNamedPipeStream.cpp @@ -35,7 +35,9 @@ // // -------------------------------------------------------------------------- WinNamedPipeStream::WinNamedPipeStream() - : mSocketHandle(NULL), + : mSocketHandle(INVALID_HANDLE_VALUE), + mReadableEvent(INVALID_HANDLE_VALUE), + mBytesInBuffer(0), mReadClosed(false), mWriteClosed(false), mIsServer(false), @@ -53,7 +55,7 @@ WinNamedPipeStream::WinNamedPipeStream() // -------------------------------------------------------------------------- WinNamedPipeStream::~WinNamedPipeStream() { - if (mSocketHandle != NULL) + if (mSocketHandle != INVALID_HANDLE_VALUE) { Close(); } @@ -70,14 +72,15 @@ WinNamedPipeStream::~WinNamedPipeStream() // -------------------------------------------------------------------------- void WinNamedPipeStream::Accept(const wchar_t* pName) { - if (mSocketHandle != NULL || mIsConnected) + if (mSocketHandle != INVALID_HANDLE_VALUE || mIsConnected) { THROW_EXCEPTION(ServerException, SocketAlreadyOpen) } mSocketHandle = CreateNamedPipeW( pName, // pipe name - PIPE_ACCESS_DUPLEX, // read/write access + PIPE_ACCESS_DUPLEX | // read/write access + FILE_FLAG_OVERLAPPED, // enabled overlapped I/O PIPE_TYPE_MESSAGE | // message type pipe PIPE_READMODE_MESSAGE | // message-read mode PIPE_WAIT, // blocking mode @@ -87,7 +90,7 @@ void WinNamedPipeStream::Accept(const wchar_t* pName) NMPWAIT_USE_DEFAULT_WAIT, // client time-out NULL); // default security attribute - if (mSocketHandle == NULL) + if (mSocketHandle == INVALID_HANDLE_VALUE) { ::syslog(LOG_ERR, "CreateNamedPipeW failed: %d", GetLastError()); @@ -104,10 +107,42 @@ void WinNamedPipeStream::Accept(const wchar_t* pName) THROW_EXCEPTION(ServerException, SocketOpenError) } + mBytesInBuffer = 0; mReadClosed = false; mWriteClosed = false; mIsServer = true; // must flush and disconnect before closing mIsConnected = true; + + // create the Readable event + mReadableEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + + if (mReadableEvent == INVALID_HANDLE_VALUE) + { + ::syslog(LOG_ERR, "Failed to create the Readable event: " + "error %d", GetLastError()); + Close(); + THROW_EXCEPTION(CommonException, Internal) + } + + // initialise the OVERLAPPED structure + memset(&mReadOverlap, 0, sizeof(mReadOverlap)); + mReadOverlap.hEvent = mReadableEvent; + + // start the first overlapped read + if (!ReadFile(mSocketHandle, mReadBuffer, sizeof(mReadBuffer), + NULL, &mReadOverlap)) + { + DWORD err = GetLastError(); + + if (err != ERROR_IO_PENDING) + { + ::syslog(LOG_ERR, "Failed to start overlapped read: " + "error %d", err); + Close(); + THROW_EXCEPTION(ConnectionException, + Conn_SocketReadError) + } + } } // -------------------------------------------------------------------------- @@ -120,7 +155,7 @@ void WinNamedPipeStream::Accept(const wchar_t* pName) // -------------------------------------------------------------------------- void WinNamedPipeStream::Connect(const wchar_t* pName) { - if (mSocketHandle != NULL || mIsConnected) + if (mSocketHandle != INVALID_HANDLE_VALUE || mIsConnected) { THROW_EXCEPTION(ServerException, SocketAlreadyOpen) } @@ -137,8 +172,17 @@ void WinNamedPipeStream::Connect(const wchar_t* pName) if (mSocketHandle == INVALID_HANDLE_VALUE) { - ::syslog(LOG_ERR, "Failed to connect to server's named pipe: " - "error %d", GetLastError()); + DWORD err = GetLastError(); + if (err == ERROR_PIPE_BUSY) + { + ::syslog(LOG_ERR, "Failed to connect to backup " + "daemon: it is busy with another connection"); + } + else + { + ::syslog(LOG_ERR, "Failed to connect to backup " + "daemon: error %d", err); + } THROW_EXCEPTION(ServerException, SocketOpenError) } @@ -159,33 +203,171 @@ void WinNamedPipeStream::Connect(const wchar_t* pName) int WinNamedPipeStream::Read(void *pBuffer, int NBytes, int Timeout) { // TODO no support for timeouts yet - ASSERT(Timeout == IOStream::TimeOutInfinite) + if (Timeout != IOStream::TimeOutInfinite) + { + THROW_EXCEPTION(CommonException, AssertFailed) + } - if (mSocketHandle == NULL || !mIsConnected) + if (mSocketHandle == INVALID_HANDLE_VALUE || !mIsConnected) { THROW_EXCEPTION(ServerException, BadSocketHandle) } + if (mReadClosed) + { + THROW_EXCEPTION(ConnectionException, SocketShutdownError) + } + + // ensure safe to cast NBytes to unsigned + if (NBytes < 0) + { + THROW_EXCEPTION(CommonException, AssertFailed) + } + DWORD NumBytesRead; - - bool Success = ReadFile( - mSocketHandle, // pipe handle - pBuffer, // buffer to receive reply - NBytes, // size of buffer - &NumBytesRead, // number of bytes read - NULL); // not overlapped - - if (!Success) + + if (mIsServer) { - THROW_EXCEPTION(ConnectionException, Conn_SocketReadError) + // satisfy from buffer if possible, to avoid + // blocking on read. + bool needAnotherRead = false; + if (mBytesInBuffer == 0) + { + // overlapped I/O completed successfully? + // (wait if needed) + + if (GetOverlappedResult(mSocketHandle, + &mReadOverlap, &NumBytesRead, TRUE)) + { + needAnotherRead = true; + } + else + { + DWORD err = GetLastError(); + + if (err == ERROR_HANDLE_EOF) + { + mReadClosed = true; + } + else + { + if (err == ERROR_BROKEN_PIPE) + { + ::syslog(LOG_ERR, "Control " + "client disconnected"); + } + else + { + ::syslog(LOG_ERR, + "Failed to wait for " + "ReadFile to complete: " + "error %d", err); + } + + Close(); + THROW_EXCEPTION(ConnectionException, + Conn_SocketReadError) + } + } + } + else + { + NumBytesRead = 0; + } + + size_t BytesToCopy = NumBytesRead + mBytesInBuffer; + size_t BytesRemaining = 0; + + if (BytesToCopy > (size_t)NBytes) + { + BytesRemaining = BytesToCopy - NBytes; + BytesToCopy = NBytes; + } + + memcpy(pBuffer, mReadBuffer, BytesToCopy); + memcpy(mReadBuffer, mReadBuffer + BytesToCopy, BytesRemaining); + + mBytesInBuffer = BytesRemaining; + NumBytesRead = BytesToCopy; + + // start the next overlapped read + if (needAnotherRead && !ReadFile(mSocketHandle, + mReadBuffer + mBytesInBuffer, + sizeof(mReadBuffer) - mBytesInBuffer, + NULL, &mReadOverlap)) + { + DWORD err = GetLastError(); + if (err == ERROR_IO_PENDING) + { + // Don't reset yet, there might be data + // in the buffer waiting to be read, + // will check below. + // ResetEvent(mReadableEvent); + } + else if (err == ERROR_HANDLE_EOF) + { + mReadClosed = true; + } + else if (err == ERROR_BROKEN_PIPE) + { + ::syslog(LOG_ERR, + "Control client disconnected"); + mReadClosed = true; + } + else + { + ::syslog(LOG_ERR, "Failed to start " + "overlapped read: error %d", err); + Close(); + THROW_EXCEPTION(ConnectionException, + Conn_SocketReadError) + } + } + + // If the read succeeded immediately, leave the event + // signaled, so that we will be called again to process + // the newly read data and start another overlapped read. + if (needAnotherRead && !mReadClosed) + { + // leave signalled + } + else if (!needAnotherRead && mBytesInBuffer > 0) + { + // leave signalled + } + else + { + // nothing left to read, reset the event + ResetEvent(mReadableEvent); + // FIXME: a pending read could have signalled + // the event (again) while we were busy reading. + // that signal would be lost, and the reading + // thread would block. Should be pretty obvious + // if this happens in practice: control client + // hangs. + } } - - // Closed for reading at EOF? - if (NumBytesRead == 0) + else { - mReadClosed = true; + if (!ReadFile( + mSocketHandle, // pipe handle + pBuffer, // buffer to receive reply + NBytes, // size of buffer + &NumBytesRead, // number of bytes read + NULL)) // not overlapped + { + Close(); + THROW_EXCEPTION(ConnectionException, + Conn_SocketReadError) + } + + // Closed for reading at EOF? + if (NumBytesRead == 0) + { + mReadClosed = true; + } } - + return NumBytesRead; } @@ -199,7 +381,7 @@ int WinNamedPipeStream::Read(void *pBuffer, int NBytes, int Timeout) // -------------------------------------------------------------------------- void WinNamedPipeStream::Write(const void *pBuffer, int NBytes) { - if (mSocketHandle == NULL || !mIsConnected) + if (mSocketHandle == INVALID_HANDLE_VALUE || !mIsConnected) { THROW_EXCEPTION(ServerException, BadSocketHandle) } @@ -223,7 +405,7 @@ void WinNamedPipeStream::Write(const void *pBuffer, int NBytes) if (!Success) { - mWriteClosed = true; // assume can't write again + Close(); THROW_EXCEPTION(ConnectionException, Conn_SocketWriteError) } @@ -242,20 +424,39 @@ void WinNamedPipeStream::Write(const void *pBuffer, int NBytes) // -------------------------------------------------------------------------- void WinNamedPipeStream::Close() { - if (mSocketHandle == NULL && mIsConnected) + if (mSocketHandle == INVALID_HANDLE_VALUE && mIsConnected) { fprintf(stderr, "Inconsistent connected state\n"); ::syslog(LOG_ERR, "Inconsistent connected state"); mIsConnected = false; } - if (mSocketHandle == NULL) + if (mSocketHandle == INVALID_HANDLE_VALUE) { THROW_EXCEPTION(ServerException, BadSocketHandle) } if (mIsServer) - { + { + if (!CancelIo(mSocketHandle)) + { + ::syslog(LOG_ERR, "Failed to cancel outstanding " + "I/O: error %d", GetLastError()); + } + + if (mReadableEvent == INVALID_HANDLE_VALUE) + { + ::syslog(LOG_ERR, "Failed to destroy Readable " + "event: invalid handle"); + } + else if (!CloseHandle(mReadableEvent)) + { + ::syslog(LOG_ERR, "Failed to destroy Readable " + "event: error %d", GetLastError()); + } + + mReadableEvent = INVALID_HANDLE_VALUE; + if (!FlushFileBuffers(mSocketHandle)) { ::syslog(LOG_INFO, "FlushFileBuffers failed: %d", @@ -273,8 +474,10 @@ void WinNamedPipeStream::Close() bool result = CloseHandle(mSocketHandle); - mSocketHandle = NULL; + mSocketHandle = INVALID_HANDLE_VALUE; mIsConnected = false; + mReadClosed = true; + mWriteClosed = true; if (!result) { @@ -309,4 +512,27 @@ bool WinNamedPipeStream::StreamClosed() return mWriteClosed; } +// -------------------------------------------------------------------------- +// +// Function +// Name: IOStream::WriteAllBuffered() +// Purpose: Ensures that any data which has been buffered is written to the stream +// Created: 2003/08/26 +// +// -------------------------------------------------------------------------- +void WinNamedPipeStream::WriteAllBuffered() +{ + if (mSocketHandle == INVALID_HANDLE_VALUE || !mIsConnected) + { + THROW_EXCEPTION(ServerException, BadSocketHandle) + } + + if (!FlushFileBuffers(mSocketHandle)) + { + ::syslog(LOG_WARNING, "FlushFileBuffers failed: %d", + GetLastError()); + } +} + + #endif // WIN32 diff --git a/lib/server/WinNamedPipeStream.h b/lib/server/WinNamedPipeStream.h index 5a800371..aded2d59 100644 --- a/lib/server/WinNamedPipeStream.h +++ b/lib/server/WinNamedPipeStream.h @@ -36,13 +36,15 @@ public: virtual int Read(void *pBuffer, int NBytes, int Timeout = IOStream::TimeOutInfinite); virtual void Write(const void *pBuffer, int NBytes); + virtual void WriteAllBuffered(); virtual void Close(); virtual bool StreamDataLeft(); virtual bool StreamClosed(); bool IsConnected() { return mIsConnected; } + HANDLE GetSocketHandle() { return mSocketHandle; } + HANDLE GetReadableEvent() { return mReadableEvent; } protected: - HANDLE GetSocketHandle(); void MarkAsReadClosed() {mReadClosed = true;} void MarkAsWriteClosed() {mWriteClosed = true;} @@ -51,6 +53,10 @@ private: { /* do not call */ } HANDLE mSocketHandle; + HANDLE mReadableEvent; + OVERLAPPED mReadOverlap; + uint8_t mReadBuffer[4096]; + size_t mBytesInBuffer; bool mReadClosed; bool mWriteClosed; bool mIsServer; |