diff options
author | Chris Wilson <chris+github@qwirx.com> | 2014-12-26 23:16:24 +0000 |
---|---|---|
committer | Chris Wilson <chris+github@qwirx.com> | 2014-12-26 23:16:24 +0000 |
commit | fa4e3cc1c04dc59dad03be617a3bd31855df8e05 (patch) | |
tree | 1ac31e57033d9d4bb975a510dd49f73e7732822d /lib | |
parent | 5d37dffa9e146bab680f7ddead25bad4f4c18b16 (diff) |
Add support for timeouts on named pipe writes, using overlapped I/O.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/server/WinNamedPipeStream.cpp | 432 | ||||
-rw-r--r-- | lib/server/WinNamedPipeStream.h | 34 |
2 files changed, 251 insertions, 215 deletions
diff --git a/lib/server/WinNamedPipeStream.cpp b/lib/server/WinNamedPipeStream.cpp index 9d33ec86..14da2dd0 100644 --- a/lib/server/WinNamedPipeStream.cpp +++ b/lib/server/WinNamedPipeStream.cpp @@ -21,6 +21,7 @@ #include "autogen_ConnectionException.h" #include "autogen_ServerException.h" +#include "BoxTime.h" #include "CommonException.h" #include "Socket.h" #include "WinNamedPipeStream.h" @@ -106,6 +107,12 @@ WinNamedPipeStream::WinNamedPipeStream(HANDLE hNamedPipe) // -------------------------------------------------------------------------- WinNamedPipeStream::~WinNamedPipeStream() { + for(std::list<WriteInProgress*>::iterator i = mWritesInProgress.begin(); + i != mWritesInProgress.end(); i++) + { + delete *i; + } + if (mSocketHandle != INVALID_HANDLE_VALUE) { try @@ -240,6 +247,74 @@ void WinNamedPipeStream::Connect(const std::string& rName) mIsConnected = true; } +// Returns true if the operation is complete (and you will need to start +// another one), or false otherwise (you can wait again). +bool WinNamedPipeStream::WaitForOverlappedOperation(OVERLAPPED& Overlapped, + int Timeout, int64_t* pBytesTransferred) +{ + if (Timeout == IOStream::TimeOutInfinite) + { + Timeout = INFINITE; + } + + // overlapped I/O completed successfully? (wait if needed) + DWORD waitResult = WaitForSingleObject(Overlapped.hEvent, Timeout); + DWORD NumBytesTransferred = -1; + + if (waitResult == WAIT_ABANDONED) + { + THROW_EXCEPTION_MESSAGE(ServerException, BadSocketHandle, + "Wait for command socket read abandoned by system"); + } + + if (waitResult == WAIT_TIMEOUT) + { + // wait timed out, nothing to read + *pBytesTransferred = 0; + return false; + } + + if (waitResult != WAIT_OBJECT_0) + { + THROW_EXCEPTION_MESSAGE(ServerException, BadSocketHandle, + "Failed to wait for command socket read: unknown " + "result code: " << waitResult); + } + + // object is ready to read from + if (GetOverlappedResult(mSocketHandle, &Overlapped, + &NumBytesTransferred, TRUE)) + { + *pBytesTransferred = NumBytesTransferred; + return true; + } + + // We are here because there was an error. + DWORD err = GetLastError(); + + if (err == ERROR_HANDLE_EOF) + { + Close(); + return true; + } + + // ERROR_NO_DATA is a strange name for + // "The pipe is being closed". No exception wanted. + + if (err == ERROR_NO_DATA || + err == ERROR_PIPE_NOT_CONNECTED || + err == ERROR_BROKEN_PIPE) + { + BOX_INFO(BOX_WIN_ERRNO_MESSAGE(err, + "Control client disconnected")); + Close(); + return true; + } + + THROW_WIN_ERROR_NUMBER("Failed to wait for OVERLAPPED operation " + "to complete", err, ConnectionException, SocketReadError); +} + // -------------------------------------------------------------------------- // // Function @@ -272,169 +347,79 @@ int WinNamedPipeStream::Read(void *pBuffer, int NBytes, int Timeout) THROW_EXCEPTION(CommonException, AssertFailed) } - DWORD NumBytesRead; + int64_t NumBytesRead; - if (mIsServer) + // satisfy from buffer if possible, to avoid + // blocking on read. + bool needAnotherRead = false; + if (mBytesInBuffer == 0) { - // satisfy from buffer if possible, to avoid - // blocking on read. - bool needAnotherRead = false; - if (mBytesInBuffer == 0) - { - // overlapped I/O completed successfully? - // (wait if needed) - DWORD waitResult = WaitForSingleObject( - mReadOverlap.hEvent, Timeout); - - if (waitResult == WAIT_ABANDONED) - { - BOX_ERROR("Wait for command socket read " - "abandoned by system"); - THROW_EXCEPTION(ServerException, - BadSocketHandle); - } - else if (waitResult == WAIT_TIMEOUT) - { - // wait timed out, nothing to read - NumBytesRead = 0; - } - else if (waitResult != WAIT_OBJECT_0) - { - BOX_ERROR("Failed to wait for command " - "socket read: unknown result " << - waitResult); - } - // object is ready to read from - else if (GetOverlappedResult(mSocketHandle, - &mReadOverlap, &NumBytesRead, TRUE)) - { - needAnotherRead = true; - } - else - { - DWORD err = GetLastError(); - - if (err == ERROR_HANDLE_EOF) - { - mReadClosed = true; - } - else - { - if (err == ERROR_BROKEN_PIPE) - { - BOX_NOTICE("Control client " - "disconnected"); - } - else - { - BOX_ERROR("Failed to wait for " - "ReadFile to complete: " - << GetErrorMessage(err)); - } - - Close(); - THROW_EXCEPTION(ConnectionException, - SocketReadError) - } - } - } - else - { - NumBytesRead = 0; - } + needAnotherRead = WaitForOverlappedOperation( + mReadOverlap, Timeout, &NumBytesRead); + } + else + { + // Just return the existing data from the buffer + // this time around. The caller should call again, + // and then the buffer will be empty. + NumBytesRead = 0; + } - size_t BytesToCopy = NumBytesRead + mBytesInBuffer; - size_t BytesRemaining = 0; + size_t BytesToCopy = NumBytesRead + mBytesInBuffer; + size_t BytesRemaining = 0; - if (BytesToCopy > (size_t)NBytes) - { - BytesRemaining = BytesToCopy - NBytes; - BytesToCopy = NBytes; - } + if (BytesToCopy > (size_t)NBytes) + { + BytesRemaining = BytesToCopy - NBytes; + BytesToCopy = NBytes; + } - memcpy(pBuffer, mReadBuffer, BytesToCopy); - memmove(mReadBuffer, mReadBuffer + BytesToCopy, BytesRemaining); + memcpy(pBuffer, mReadBuffer, BytesToCopy); + memmove(mReadBuffer, mReadBuffer + BytesToCopy, BytesRemaining); - mBytesInBuffer = BytesRemaining; - NumBytesRead = BytesToCopy; + mBytesInBuffer = BytesRemaining; + NumBytesRead = BytesToCopy; - if (needAnotherRead) - { - // reinitialise the OVERLAPPED structure - memset(&mReadOverlap, 0, sizeof(mReadOverlap)); - mReadOverlap.hEvent = mReadableEvent; - } + if (needAnotherRead) + { + // reinitialise the OVERLAPPED structure + memset(&mReadOverlap, 0, sizeof(mReadOverlap)); + mReadOverlap.hEvent = mReadableEvent; + } - // start the next overlapped read - if (needAnotherRead && !ReadFile(mSocketHandle, - mReadBuffer + mBytesInBuffer, - sizeof(mReadBuffer) - mBytesInBuffer, - NULL, &mReadOverlap)) + // start the next overlapped read + if (needAnotherRead && !ReadFile(mSocketHandle, + mReadBuffer + mBytesInBuffer, + sizeof(mReadBuffer) - mBytesInBuffer, + NULL, &mReadOverlap)) + { + DWORD err = GetLastError(); + if (err == ERROR_IO_PENDING) { - DWORD err = GetLastError(); - if (err == ERROR_IO_PENDING) - { - // Don't reset yet, there might be data - // in the buffer waiting to be read, - // will check below. - // ResetEvent(mReadableEvent); - } - else if (err == ERROR_HANDLE_EOF) - { - mReadClosed = true; - } - else if (err == ERROR_BROKEN_PIPE) - { - BOX_ERROR("Control client disconnected"); - mReadClosed = true; - } - else - { - BOX_ERROR("Failed to start overlapped read: " - << GetErrorMessage(err)); - Close(); - THROW_EXCEPTION(ConnectionException, - SocketReadError) - } + // Don't reset yet, there might be data + // in the buffer waiting to be read, + // will check below. + // ResetEvent(mReadableEvent); } - } - else - { - if (!ReadFile( - mSocketHandle, // pipe handle - pBuffer, // buffer to receive reply - NBytes, // size of buffer - &NumBytesRead, // number of bytes read - NULL)) // not overlapped + else if (err == ERROR_HANDLE_EOF) { - DWORD err = GetLastError(); - - Close(); - - // ERROR_NO_DATA is a strange name for - // "The pipe is being closed". No exception wanted. - - if (err == ERROR_NO_DATA || - err == ERROR_PIPE_NOT_CONNECTED) - { - NumBytesRead = 0; - } - else - { - BOX_ERROR("Failed to read from control socket: " - << GetErrorMessage(err)); - THROW_EXCEPTION(ConnectionException, - SocketReadError) - } + mReadClosed = true; } - - // Closed for reading at EOF? - if (NumBytesRead == 0) + else if (err == ERROR_BROKEN_PIPE) { + BOX_ERROR("Control client disconnected"); mReadClosed = true; } + else + { + BOX_ERROR("Failed to start overlapped read: " + << GetErrorMessage(err)); + Close(); + THROW_EXCEPTION(ConnectionException, + SocketReadError) + } } - + return NumBytesRead; } @@ -446,8 +431,15 @@ int WinNamedPipeStream::Read(void *pBuffer, int NBytes, int Timeout) // Created: 2003/07/31 // // -------------------------------------------------------------------------- -void WinNamedPipeStream::Write(const void *pBuffer, int NBytes) +void WinNamedPipeStream::Write(const void *pBuffer, int NBytes, int Timeout) { + // Calculate the deadline at the beginning. Not valid if Timeout is + // IOStream::TimeOutInfinite! + ASSERT(Timeout != IOStream::TimeOutInfinite); + + box_time_t deadline = GetCurrentBoxTime() + + MilliSecondsToBoxTime(Timeout); + if (mSocketHandle == INVALID_HANDLE_VALUE || !mIsConnected) { THROW_EXCEPTION(ServerException, BadSocketHandle) @@ -455,41 +447,59 @@ void WinNamedPipeStream::Write(const void *pBuffer, int NBytes) // Buffer in byte sized type. ASSERT(sizeof(char) == 1); - const char *pByteBuffer = (char *)pBuffer; - - int NumBytesWrittenTotal = 0; - - while (NumBytesWrittenTotal < NBytes) + WriteInProgress* new_write = new WriteInProgress( + std::string((char *)pBuffer, NBytes)); + + // Start the WriteFile operation, and add to queue if pending. + BOOL Success = WriteFile( + mSocketHandle, // pipe handle + new_write->mBuffer.c_str(), // message + NBytes, // message length + NULL, // bytes written this time + &(new_write->mOverlap)); + + if (Success == TRUE) { - DWORD NumBytesWrittenThisTime = 0; - - bool Success = WriteFile( - mSocketHandle, // pipe handle - pByteBuffer + NumBytesWrittenTotal, // message - NBytes - NumBytesWrittenTotal, // message length - &NumBytesWrittenThisTime, // bytes written this time - NULL); // not overlapped + BOX_NOTICE("Write claimed success while overlapped?"); + mWritesInProgress.push_back(new_write); + } + else + { + DWORD err = GetLastError(); - if (!Success) + if (err == ERROR_IO_PENDING) { - // ERROR_NO_DATA is a strange name for - // "The pipe is being closed". - - DWORD err = GetLastError(); - - if (err != ERROR_NO_DATA) - { - BOX_ERROR("Failed to write to control " - "socket: " << GetErrorMessage(err)); - } - + BOX_TRACE("WriteFile is pending, adding to queue"); + mWritesInProgress.push_back(new_write); + } + else + { + // Not in progress any more, pop it Close(); - - THROW_EXCEPTION(ConnectionException, - SocketWriteError) + THROW_WIN_ERROR_NUMBER("Failed to start overlapped " + "write", err, ConnectionException, + SocketWriteError); } + } - NumBytesWrittenTotal += NumBytesWrittenThisTime; + // Wait for previous WriteFile operations to complete, one at a time, + // until the deadline expires. + for(box_time_t remaining = deadline - GetCurrentBoxTime(); + remaining > 0 && !mWritesInProgress.empty(); + remaining = deadline - GetCurrentBoxTime()) + { + int new_timeout = BoxTimeToMilliSeconds(remaining); + WriteInProgress* oldest_write = + *(mWritesInProgress.begin()); + + int64_t bytes_written = 0; + if(WaitForOverlappedOperation(oldest_write->mOverlap, + new_timeout, &bytes_written)) + { + // This one is complete, pop it and start a new one + delete oldest_write; + mWritesInProgress.pop_front(); + } } } @@ -514,58 +524,50 @@ void WinNamedPipeStream::Close() THROW_EXCEPTION(ServerException, BadSocketHandle) } - if (mIsServer) + if (!CancelIo(mSocketHandle)) { - if (!CancelIo(mSocketHandle)) - { - BOX_ERROR("Failed to cancel outstanding I/O: " << - GetErrorMessage(GetLastError())); - } + BOX_ERROR("Failed to cancel outstanding I/O: " << + GetErrorMessage(GetLastError())); + } - if (mReadableEvent == INVALID_HANDLE_VALUE) - { - BOX_ERROR("Failed to destroy Readable event: " - "invalid handle"); - } - else if (!CloseHandle(mReadableEvent)) - { - BOX_ERROR("Failed to destroy Readable event: " << - GetErrorMessage(GetLastError())); - } + if (mReadableEvent == INVALID_HANDLE_VALUE) + { + BOX_ERROR("Failed to destroy Readable event: " + "invalid handle"); + } + else if (!CloseHandle(mReadableEvent)) + { + BOX_ERROR("Failed to destroy Readable event: " << + GetErrorMessage(GetLastError())); + } - mReadableEvent = INVALID_HANDLE_VALUE; + mReadableEvent = INVALID_HANDLE_VALUE; - if (!FlushFileBuffers(mSocketHandle)) - { - BOX_ERROR("Failed to FlushFileBuffers: " << - GetErrorMessage(GetLastError())); - } - - if (!DisconnectNamedPipe(mSocketHandle)) + if (!FlushFileBuffers(mSocketHandle)) + { + BOX_ERROR("Failed to FlushFileBuffers: " << + GetErrorMessage(GetLastError())); + } + + if (!DisconnectNamedPipe(mSocketHandle)) + { + DWORD err = GetLastError(); + if (err != ERROR_PIPE_NOT_CONNECTED) { - DWORD err = GetLastError(); - if (err != ERROR_PIPE_NOT_CONNECTED) - { - BOX_ERROR("Failed to DisconnectNamedPipe: " << - GetErrorMessage(err)); - } + BOX_ERROR("Failed to DisconnectNamedPipe: " << + GetErrorMessage(err)); } - - mIsServer = false; } - bool result = CloseHandle(mSocketHandle); - mSocketHandle = INVALID_HANDLE_VALUE; mIsConnected = false; mReadClosed = true; mWriteClosed = true; - if (!result) + if (!CloseHandle(mSocketHandle)) { - BOX_ERROR("Failed to CloseHandle: " << - GetErrorMessage(GetLastError())); - THROW_EXCEPTION(ServerException, SocketCloseError) + THROW_WIN_ERROR_NUMBER("Failed to CloseHandle", + GetLastError(), ServerException, SocketCloseError); } } diff --git a/lib/server/WinNamedPipeStream.h b/lib/server/WinNamedPipeStream.h index 37eda40d..99ce3548 100644 --- a/lib/server/WinNamedPipeStream.h +++ b/lib/server/WinNamedPipeStream.h @@ -10,6 +10,8 @@ #if ! defined WINNAMEDPIPESTREAM__H && defined WIN32 #define WINNAMEDPIPESTREAM__H +#include <list> + #include "IOStream.h" // -------------------------------------------------------------------------- @@ -46,6 +48,8 @@ public: protected: void MarkAsReadClosed() {mReadClosed = true;} void MarkAsWriteClosed() {mWriteClosed = true;} + bool WaitForOverlappedOperation(OVERLAPPED& Overlapped, + int Timeout, int64_t* pBytesTransferred); private: WinNamedPipeStream(const WinNamedPipeStream &rToCopy) @@ -61,6 +65,36 @@ private: bool mIsServer; bool mIsConnected; + class WriteInProgress { + private: + friend class WinNamedPipeStream; + std::string mBuffer; + OVERLAPPED mOverlap; + WriteInProgress(const WriteInProgress& other); // do not call + public: + WriteInProgress(const std::string& dataToWrite) + : mBuffer(dataToWrite) + { + // create the Writable event + HANDLE writable_event = CreateEvent(NULL, TRUE, FALSE, + NULL); + if (writable_event == INVALID_HANDLE_VALUE) + { + BOX_LOG_WIN_ERROR("Failed to create the " + "Writable event"); + THROW_EXCEPTION(CommonException, Internal) + } + + memset(&mOverlap, 0, sizeof(mOverlap)); + mOverlap.hEvent = writable_event; + } + ~WriteInProgress() + { + CloseHandle(mOverlap.hEvent); + } + }; + std::list<WriteInProgress*> mWritesInProgress; + public: static std::string sPipeNamePrefix; }; |