summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Wilson <chris+github@qwirx.com>2014-12-26 23:16:24 +0000
committerChris Wilson <chris+github@qwirx.com>2014-12-26 23:16:24 +0000
commitfa4e3cc1c04dc59dad03be617a3bd31855df8e05 (patch)
tree1ac31e57033d9d4bb975a510dd49f73e7732822d
parent5d37dffa9e146bab680f7ddead25bad4f4c18b16 (diff)
Add support for timeouts on named pipe writes, using overlapped I/O.
-rw-r--r--lib/server/WinNamedPipeStream.cpp432
-rw-r--r--lib/server/WinNamedPipeStream.h34
2 files changed, 251 insertions, 215 deletions
diff --git a/lib/server/WinNamedPipeStream.cpp b/lib/server/WinNamedPipeStream.cpp
index 9d33ec86..14da2dd0 100644
--- a/lib/server/WinNamedPipeStream.cpp
+++ b/lib/server/WinNamedPipeStream.cpp
@@ -21,6 +21,7 @@
#include "autogen_ConnectionException.h"
#include "autogen_ServerException.h"
+#include "BoxTime.h"
#include "CommonException.h"
#include "Socket.h"
#include "WinNamedPipeStream.h"
@@ -106,6 +107,12 @@ WinNamedPipeStream::WinNamedPipeStream(HANDLE hNamedPipe)
// --------------------------------------------------------------------------
WinNamedPipeStream::~WinNamedPipeStream()
{
+ for(std::list<WriteInProgress*>::iterator i = mWritesInProgress.begin();
+ i != mWritesInProgress.end(); i++)
+ {
+ delete *i;
+ }
+
if (mSocketHandle != INVALID_HANDLE_VALUE)
{
try
@@ -240,6 +247,74 @@ void WinNamedPipeStream::Connect(const std::string& rName)
mIsConnected = true;
}
+// Returns true if the operation is complete (and you will need to start
+// another one), or false otherwise (you can wait again).
+bool WinNamedPipeStream::WaitForOverlappedOperation(OVERLAPPED& Overlapped,
+ int Timeout, int64_t* pBytesTransferred)
+{
+ if (Timeout == IOStream::TimeOutInfinite)
+ {
+ Timeout = INFINITE;
+ }
+
+ // overlapped I/O completed successfully? (wait if needed)
+ DWORD waitResult = WaitForSingleObject(Overlapped.hEvent, Timeout);
+ DWORD NumBytesTransferred = -1;
+
+ if (waitResult == WAIT_ABANDONED)
+ {
+ THROW_EXCEPTION_MESSAGE(ServerException, BadSocketHandle,
+ "Wait for command socket read abandoned by system");
+ }
+
+ if (waitResult == WAIT_TIMEOUT)
+ {
+ // wait timed out, nothing to read
+ *pBytesTransferred = 0;
+ return false;
+ }
+
+ if (waitResult != WAIT_OBJECT_0)
+ {
+ THROW_EXCEPTION_MESSAGE(ServerException, BadSocketHandle,
+ "Failed to wait for command socket read: unknown "
+ "result code: " << waitResult);
+ }
+
+ // object is ready to read from
+ if (GetOverlappedResult(mSocketHandle, &Overlapped,
+ &NumBytesTransferred, TRUE))
+ {
+ *pBytesTransferred = NumBytesTransferred;
+ return true;
+ }
+
+ // We are here because there was an error.
+ DWORD err = GetLastError();
+
+ if (err == ERROR_HANDLE_EOF)
+ {
+ Close();
+ return true;
+ }
+
+ // ERROR_NO_DATA is a strange name for
+ // "The pipe is being closed". No exception wanted.
+
+ if (err == ERROR_NO_DATA ||
+ err == ERROR_PIPE_NOT_CONNECTED ||
+ err == ERROR_BROKEN_PIPE)
+ {
+ BOX_INFO(BOX_WIN_ERRNO_MESSAGE(err,
+ "Control client disconnected"));
+ Close();
+ return true;
+ }
+
+ THROW_WIN_ERROR_NUMBER("Failed to wait for OVERLAPPED operation "
+ "to complete", err, ConnectionException, SocketReadError);
+}
+
// --------------------------------------------------------------------------
//
// Function
@@ -272,169 +347,79 @@ int WinNamedPipeStream::Read(void *pBuffer, int NBytes, int Timeout)
THROW_EXCEPTION(CommonException, AssertFailed)
}
- DWORD NumBytesRead;
+ int64_t NumBytesRead;
- if (mIsServer)
+ // satisfy from buffer if possible, to avoid
+ // blocking on read.
+ bool needAnotherRead = false;
+ if (mBytesInBuffer == 0)
{
- // satisfy from buffer if possible, to avoid
- // blocking on read.
- bool needAnotherRead = false;
- if (mBytesInBuffer == 0)
- {
- // overlapped I/O completed successfully?
- // (wait if needed)
- DWORD waitResult = WaitForSingleObject(
- mReadOverlap.hEvent, Timeout);
-
- if (waitResult == WAIT_ABANDONED)
- {
- BOX_ERROR("Wait for command socket read "
- "abandoned by system");
- THROW_EXCEPTION(ServerException,
- BadSocketHandle);
- }
- else if (waitResult == WAIT_TIMEOUT)
- {
- // wait timed out, nothing to read
- NumBytesRead = 0;
- }
- else if (waitResult != WAIT_OBJECT_0)
- {
- BOX_ERROR("Failed to wait for command "
- "socket read: unknown result " <<
- waitResult);
- }
- // object is ready to read from
- else if (GetOverlappedResult(mSocketHandle,
- &mReadOverlap, &NumBytesRead, TRUE))
- {
- needAnotherRead = true;
- }
- else
- {
- DWORD err = GetLastError();
-
- if (err == ERROR_HANDLE_EOF)
- {
- mReadClosed = true;
- }
- else
- {
- if (err == ERROR_BROKEN_PIPE)
- {
- BOX_NOTICE("Control client "
- "disconnected");
- }
- else
- {
- BOX_ERROR("Failed to wait for "
- "ReadFile to complete: "
- << GetErrorMessage(err));
- }
-
- Close();
- THROW_EXCEPTION(ConnectionException,
- SocketReadError)
- }
- }
- }
- else
- {
- NumBytesRead = 0;
- }
+ needAnotherRead = WaitForOverlappedOperation(
+ mReadOverlap, Timeout, &NumBytesRead);
+ }
+ else
+ {
+ // Just return the existing data from the buffer
+ // this time around. The caller should call again,
+ // and then the buffer will be empty.
+ NumBytesRead = 0;
+ }
- size_t BytesToCopy = NumBytesRead + mBytesInBuffer;
- size_t BytesRemaining = 0;
+ size_t BytesToCopy = NumBytesRead + mBytesInBuffer;
+ size_t BytesRemaining = 0;
- if (BytesToCopy > (size_t)NBytes)
- {
- BytesRemaining = BytesToCopy - NBytes;
- BytesToCopy = NBytes;
- }
+ if (BytesToCopy > (size_t)NBytes)
+ {
+ BytesRemaining = BytesToCopy - NBytes;
+ BytesToCopy = NBytes;
+ }
- memcpy(pBuffer, mReadBuffer, BytesToCopy);
- memmove(mReadBuffer, mReadBuffer + BytesToCopy, BytesRemaining);
+ memcpy(pBuffer, mReadBuffer, BytesToCopy);
+ memmove(mReadBuffer, mReadBuffer + BytesToCopy, BytesRemaining);
- mBytesInBuffer = BytesRemaining;
- NumBytesRead = BytesToCopy;
+ mBytesInBuffer = BytesRemaining;
+ NumBytesRead = BytesToCopy;
- if (needAnotherRead)
- {
- // reinitialise the OVERLAPPED structure
- memset(&mReadOverlap, 0, sizeof(mReadOverlap));
- mReadOverlap.hEvent = mReadableEvent;
- }
+ if (needAnotherRead)
+ {
+ // reinitialise the OVERLAPPED structure
+ memset(&mReadOverlap, 0, sizeof(mReadOverlap));
+ mReadOverlap.hEvent = mReadableEvent;
+ }
- // start the next overlapped read
- if (needAnotherRead && !ReadFile(mSocketHandle,
- mReadBuffer + mBytesInBuffer,
- sizeof(mReadBuffer) - mBytesInBuffer,
- NULL, &mReadOverlap))
+ // start the next overlapped read
+ if (needAnotherRead && !ReadFile(mSocketHandle,
+ mReadBuffer + mBytesInBuffer,
+ sizeof(mReadBuffer) - mBytesInBuffer,
+ NULL, &mReadOverlap))
+ {
+ DWORD err = GetLastError();
+ if (err == ERROR_IO_PENDING)
{
- DWORD err = GetLastError();
- if (err == ERROR_IO_PENDING)
- {
- // Don't reset yet, there might be data
- // in the buffer waiting to be read,
- // will check below.
- // ResetEvent(mReadableEvent);
- }
- else if (err == ERROR_HANDLE_EOF)
- {
- mReadClosed = true;
- }
- else if (err == ERROR_BROKEN_PIPE)
- {
- BOX_ERROR("Control client disconnected");
- mReadClosed = true;
- }
- else
- {
- BOX_ERROR("Failed to start overlapped read: "
- << GetErrorMessage(err));
- Close();
- THROW_EXCEPTION(ConnectionException,
- SocketReadError)
- }
+ // Don't reset yet, there might be data
+ // in the buffer waiting to be read,
+ // will check below.
+ // ResetEvent(mReadableEvent);
}
- }
- else
- {
- if (!ReadFile(
- mSocketHandle, // pipe handle
- pBuffer, // buffer to receive reply
- NBytes, // size of buffer
- &NumBytesRead, // number of bytes read
- NULL)) // not overlapped
+ else if (err == ERROR_HANDLE_EOF)
{
- DWORD err = GetLastError();
-
- Close();
-
- // ERROR_NO_DATA is a strange name for
- // "The pipe is being closed". No exception wanted.
-
- if (err == ERROR_NO_DATA ||
- err == ERROR_PIPE_NOT_CONNECTED)
- {
- NumBytesRead = 0;
- }
- else
- {
- BOX_ERROR("Failed to read from control socket: "
- << GetErrorMessage(err));
- THROW_EXCEPTION(ConnectionException,
- SocketReadError)
- }
+ mReadClosed = true;
}
-
- // Closed for reading at EOF?
- if (NumBytesRead == 0)
+ else if (err == ERROR_BROKEN_PIPE)
{
+ BOX_ERROR("Control client disconnected");
mReadClosed = true;
}
+ else
+ {
+ BOX_ERROR("Failed to start overlapped read: "
+ << GetErrorMessage(err));
+ Close();
+ THROW_EXCEPTION(ConnectionException,
+ SocketReadError)
+ }
}
-
+
return NumBytesRead;
}
@@ -446,8 +431,15 @@ int WinNamedPipeStream::Read(void *pBuffer, int NBytes, int Timeout)
// Created: 2003/07/31
//
// --------------------------------------------------------------------------
-void WinNamedPipeStream::Write(const void *pBuffer, int NBytes)
+void WinNamedPipeStream::Write(const void *pBuffer, int NBytes, int Timeout)
{
+ // Calculate the deadline at the beginning. Not valid if Timeout is
+ // IOStream::TimeOutInfinite!
+ ASSERT(Timeout != IOStream::TimeOutInfinite);
+
+ box_time_t deadline = GetCurrentBoxTime() +
+ MilliSecondsToBoxTime(Timeout);
+
if (mSocketHandle == INVALID_HANDLE_VALUE || !mIsConnected)
{
THROW_EXCEPTION(ServerException, BadSocketHandle)
@@ -455,41 +447,59 @@ void WinNamedPipeStream::Write(const void *pBuffer, int NBytes)
// Buffer in byte sized type.
ASSERT(sizeof(char) == 1);
- const char *pByteBuffer = (char *)pBuffer;
-
- int NumBytesWrittenTotal = 0;
-
- while (NumBytesWrittenTotal < NBytes)
+ WriteInProgress* new_write = new WriteInProgress(
+ std::string((char *)pBuffer, NBytes));
+
+ // Start the WriteFile operation, and add to queue if pending.
+ BOOL Success = WriteFile(
+ mSocketHandle, // pipe handle
+ new_write->mBuffer.c_str(), // message
+ NBytes, // message length
+ NULL, // bytes written this time
+ &(new_write->mOverlap));
+
+ if (Success == TRUE)
{
- DWORD NumBytesWrittenThisTime = 0;
-
- bool Success = WriteFile(
- mSocketHandle, // pipe handle
- pByteBuffer + NumBytesWrittenTotal, // message
- NBytes - NumBytesWrittenTotal, // message length
- &NumBytesWrittenThisTime, // bytes written this time
- NULL); // not overlapped
+ BOX_NOTICE("Write claimed success while overlapped?");
+ mWritesInProgress.push_back(new_write);
+ }
+ else
+ {
+ DWORD err = GetLastError();
- if (!Success)
+ if (err == ERROR_IO_PENDING)
{
- // ERROR_NO_DATA is a strange name for
- // "The pipe is being closed".
-
- DWORD err = GetLastError();
-
- if (err != ERROR_NO_DATA)
- {
- BOX_ERROR("Failed to write to control "
- "socket: " << GetErrorMessage(err));
- }
-
+ BOX_TRACE("WriteFile is pending, adding to queue");
+ mWritesInProgress.push_back(new_write);
+ }
+ else
+ {
+ // Not in progress any more, pop it
Close();
-
- THROW_EXCEPTION(ConnectionException,
- SocketWriteError)
+ THROW_WIN_ERROR_NUMBER("Failed to start overlapped "
+ "write", err, ConnectionException,
+ SocketWriteError);
}
+ }
- NumBytesWrittenTotal += NumBytesWrittenThisTime;
+ // Wait for previous WriteFile operations to complete, one at a time,
+ // until the deadline expires.
+ for(box_time_t remaining = deadline - GetCurrentBoxTime();
+ remaining > 0 && !mWritesInProgress.empty();
+ remaining = deadline - GetCurrentBoxTime())
+ {
+ int new_timeout = BoxTimeToMilliSeconds(remaining);
+ WriteInProgress* oldest_write =
+ *(mWritesInProgress.begin());
+
+ int64_t bytes_written = 0;
+ if(WaitForOverlappedOperation(oldest_write->mOverlap,
+ new_timeout, &bytes_written))
+ {
+ // This one is complete, pop it and start a new one
+ delete oldest_write;
+ mWritesInProgress.pop_front();
+ }
}
}
@@ -514,58 +524,50 @@ void WinNamedPipeStream::Close()
THROW_EXCEPTION(ServerException, BadSocketHandle)
}
- if (mIsServer)
+ if (!CancelIo(mSocketHandle))
{
- if (!CancelIo(mSocketHandle))
- {
- BOX_ERROR("Failed to cancel outstanding I/O: " <<
- GetErrorMessage(GetLastError()));
- }
+ BOX_ERROR("Failed to cancel outstanding I/O: " <<
+ GetErrorMessage(GetLastError()));
+ }
- if (mReadableEvent == INVALID_HANDLE_VALUE)
- {
- BOX_ERROR("Failed to destroy Readable event: "
- "invalid handle");
- }
- else if (!CloseHandle(mReadableEvent))
- {
- BOX_ERROR("Failed to destroy Readable event: " <<
- GetErrorMessage(GetLastError()));
- }
+ if (mReadableEvent == INVALID_HANDLE_VALUE)
+ {
+ BOX_ERROR("Failed to destroy Readable event: "
+ "invalid handle");
+ }
+ else if (!CloseHandle(mReadableEvent))
+ {
+ BOX_ERROR("Failed to destroy Readable event: " <<
+ GetErrorMessage(GetLastError()));
+ }
- mReadableEvent = INVALID_HANDLE_VALUE;
+ mReadableEvent = INVALID_HANDLE_VALUE;
- if (!FlushFileBuffers(mSocketHandle))
- {
- BOX_ERROR("Failed to FlushFileBuffers: " <<
- GetErrorMessage(GetLastError()));
- }
-
- if (!DisconnectNamedPipe(mSocketHandle))
+ if (!FlushFileBuffers(mSocketHandle))
+ {
+ BOX_ERROR("Failed to FlushFileBuffers: " <<
+ GetErrorMessage(GetLastError()));
+ }
+
+ if (!DisconnectNamedPipe(mSocketHandle))
+ {
+ DWORD err = GetLastError();
+ if (err != ERROR_PIPE_NOT_CONNECTED)
{
- DWORD err = GetLastError();
- if (err != ERROR_PIPE_NOT_CONNECTED)
- {
- BOX_ERROR("Failed to DisconnectNamedPipe: " <<
- GetErrorMessage(err));
- }
+ BOX_ERROR("Failed to DisconnectNamedPipe: " <<
+ GetErrorMessage(err));
}
-
- mIsServer = false;
}
- bool result = CloseHandle(mSocketHandle);
-
mSocketHandle = INVALID_HANDLE_VALUE;
mIsConnected = false;
mReadClosed = true;
mWriteClosed = true;
- if (!result)
+ if (!CloseHandle(mSocketHandle))
{
- BOX_ERROR("Failed to CloseHandle: " <<
- GetErrorMessage(GetLastError()));
- THROW_EXCEPTION(ServerException, SocketCloseError)
+ THROW_WIN_ERROR_NUMBER("Failed to CloseHandle",
+ GetLastError(), ServerException, SocketCloseError);
}
}
diff --git a/lib/server/WinNamedPipeStream.h b/lib/server/WinNamedPipeStream.h
index 37eda40d..99ce3548 100644
--- a/lib/server/WinNamedPipeStream.h
+++ b/lib/server/WinNamedPipeStream.h
@@ -10,6 +10,8 @@
#if ! defined WINNAMEDPIPESTREAM__H && defined WIN32
#define WINNAMEDPIPESTREAM__H
+#include <list>
+
#include "IOStream.h"
// --------------------------------------------------------------------------
@@ -46,6 +48,8 @@ public:
protected:
void MarkAsReadClosed() {mReadClosed = true;}
void MarkAsWriteClosed() {mWriteClosed = true;}
+ bool WaitForOverlappedOperation(OVERLAPPED& Overlapped,
+ int Timeout, int64_t* pBytesTransferred);
private:
WinNamedPipeStream(const WinNamedPipeStream &rToCopy)
@@ -61,6 +65,36 @@ private:
bool mIsServer;
bool mIsConnected;
+ class WriteInProgress {
+ private:
+ friend class WinNamedPipeStream;
+ std::string mBuffer;
+ OVERLAPPED mOverlap;
+ WriteInProgress(const WriteInProgress& other); // do not call
+ public:
+ WriteInProgress(const std::string& dataToWrite)
+ : mBuffer(dataToWrite)
+ {
+ // create the Writable event
+ HANDLE writable_event = CreateEvent(NULL, TRUE, FALSE,
+ NULL);
+ if (writable_event == INVALID_HANDLE_VALUE)
+ {
+ BOX_LOG_WIN_ERROR("Failed to create the "
+ "Writable event");
+ THROW_EXCEPTION(CommonException, Internal)
+ }
+
+ memset(&mOverlap, 0, sizeof(mOverlap));
+ mOverlap.hEvent = writable_event;
+ }
+ ~WriteInProgress()
+ {
+ CloseHandle(mOverlap.hEvent);
+ }
+ };
+ std::list<WriteInProgress*> mWritesInProgress;
+
public:
static std::string sPipeNamePrefix;
};