summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Wilson <chris+github@qwirx.com>2008-10-03 23:22:36 +0000
committerChris Wilson <chris+github@qwirx.com>2008-10-03 23:22:36 +0000
commit192d12a27c64181153e2d93cce5a72db972564d7 (patch)
treefd23b09f20f9a9e798e94fa7739e9672fe476734
parent41ae62b3fb1d1bfdfe805b4102b284bfde0ecbf0 (diff)
Remove Win32 command socket thread, as it has caused too much trouble.
Handle command socket on Win32 the same as all other platforms, removing #ifdefs from BackupDaemon. Will replace this thread with regular but not excessive command socket polling using timers in future. Change error messages when command socket comms fail to make them clearer.
-rw-r--r--bin/bbackupd/BackupDaemon.cpp489
-rw-r--r--bin/bbackupd/BackupDaemon.h13
-rw-r--r--lib/server/WinNamedPipeStream.cpp142
-rw-r--r--lib/server/WinNamedPipeStream.h7
4 files changed, 185 insertions, 466 deletions
diff --git a/bin/bbackupd/BackupDaemon.cpp b/bin/bbackupd/BackupDaemon.cpp
index c7c9d0ca..3530291f 100644
--- a/bin/bbackupd/BackupDaemon.cpp
+++ b/bin/bbackupd/BackupDaemon.cpp
@@ -92,25 +92,6 @@ static const time_t MAX_SLEEP_TIME = 1024;
// This prevents repetative cycles of load on the server
#define SYNC_PERIOD_RANDOM_EXTRA_TIME_SHIFT_BY 6
-#ifdef WIN32
-// --------------------------------------------------------------------------
-//
-// Function
-// Name: HelperThread()
-// Purpose: Background thread function, called by Windows,
-// calls the BackupDaemon's RunHelperThread method
-// to listen for and act on control communications
-// Created: 18/2/04
-//
-// --------------------------------------------------------------------------
-unsigned int WINAPI HelperThread(LPVOID lpParam)
-{
- ((BackupDaemon *)lpParam)->RunHelperThread();
-
- return 0;
-}
-#endif
-
// --------------------------------------------------------------------------
//
// Function
@@ -122,7 +103,6 @@ unsigned int WINAPI HelperThread(LPVOID lpParam)
BackupDaemon::BackupDaemon()
: mState(BackupDaemon::State_Initialising),
mDeleteRedundantLocationsAfter(0),
- mpCommandSocketInfo(0),
mLastNotifiedEvent(SysadminNotifier::MAX),
mDeleteUnusedRootDirEntriesAfter(0),
mClientStoreMarker(BackupClientContext::ClientStoreMarker_NotKnown),
@@ -148,34 +128,6 @@ BackupDaemon::BackupDaemon()
{
// Only ever one instance of a daemon
SSLLib::Initialise();
-
- #ifdef WIN32
- // Create the event object to signal from main thread to
- // worker when new messages are queued to be sent to the
- // command socket.
-
- mhMessageToSendEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
- if(mhMessageToSendEvent == INVALID_HANDLE_VALUE)
- {
- BOX_ERROR("Failed to create event object: error " <<
- GetLastError());
- exit(1);
- }
-
- // Create the event object to signal from worker to main thread
- // when a command has been received on the command socket.
-
- mhCommandReceivedEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
- if(mhCommandReceivedEvent == INVALID_HANDLE_VALUE)
- {
- BOX_ERROR("Failed to create event object: error " <<
- GetLastError());
- exit(1);
- }
-
- // Create the critical section to protect the message queue
- InitializeCriticalSection(&mMessageQueueLock);
- #endif
}
// --------------------------------------------------------------------------
@@ -190,12 +142,6 @@ BackupDaemon::~BackupDaemon()
{
DeleteAllLocations();
DeleteAllIDMaps();
-
- if(mpCommandSocketInfo != 0)
- {
- delete mpCommandSocketInfo;
- mpCommandSocketInfo = 0;
- }
}
// --------------------------------------------------------------------------
@@ -384,218 +330,6 @@ int BackupDaemon::Main(const std::string &rConfigFileName)
return returnCode;
}
-
-void BackupDaemon::RunHelperThread(void)
-{
- const Configuration &conf(GetConfiguration());
- mpCommandSocketInfo = new CommandSocketInfo;
- WinNamedPipeStream& rSocket(mpCommandSocketInfo->mListeningSocket);
-
- // loop until the parent process exits, or we decide
- // to kill the thread ourselves
- while (!IsTerminateWanted())
- {
- try
- {
- std::string socket = conf.GetKeyValue("CommandSocket");
- rSocket.Accept(socket);
- }
- catch (BoxException &e)
- {
- BOX_ERROR("Failed to open command socket: " <<
- e.what());
- SetTerminateWanted();
- break; // this is fatal to listening thread
- }
- catch(std::exception &e)
- {
- BOX_ERROR("Failed to open command socket: " <<
- e.what());
- SetTerminateWanted();
- break; // this is fatal to listening thread
- }
- catch(...)
- {
- BOX_ERROR("Failed to open command socket: "
- "unknown error");
- SetTerminateWanted();
- break; // this is fatal to listening thread
- }
-
- try
- {
- // Errors here do not kill the thread,
- // only the current connection.
-
- // This next section comes from Ben's original function
- // Log
- BOX_INFO("Connection from command socket");
-
- // Send a header line summarising the configuration
- // and current state
- char summary[256];
- size_t summarySize = sprintf(summary,
- "bbackupd: %d %d %d %d\nstate %d\n",
- conf.GetKeyValueBool("AutomaticBackup"),
- conf.GetKeyValueInt("UpdateStoreInterval"),
- conf.GetKeyValueInt("MinimumFileAge"),
- conf.GetKeyValueInt("MaxUploadWait"),
- mState);
-
- rSocket.Write(summary, summarySize);
- rSocket.Write("ping\n", 5);
-
- // old queued messages are not useful
- EnterCriticalSection(&mMessageQueueLock);
- mMessageList.clear();
- ResetEvent(mhMessageToSendEvent);
- LeaveCriticalSection(&mMessageQueueLock);
-
- IOStreamGetLine readLine(rSocket);
- std::string command;
-
- while (rSocket.IsConnected() && !IsTerminateWanted())
- {
- HANDLE handles[2];
- handles[0] = mhMessageToSendEvent;
- handles[1] = rSocket.GetReadableEvent();
-
- DWORD result = WaitForMultipleObjects(
- sizeof(handles)/sizeof(*handles),
- handles, FALSE, 1000);
-
- if(result == 0)
- {
- ResetEvent(mhMessageToSendEvent);
-
- EnterCriticalSection(&mMessageQueueLock);
- try
- {
- while (mMessageList.size() > 0)
- {
- std::string message = *(mMessageList.begin());
- mMessageList.erase(mMessageList.begin());
- BOX_TRACE("Sending '" << message << "' to waiting client");
- message += "\n";
- rSocket.Write(message.c_str(),
- message.length());
- }
- }
- catch (...)
- {
- LeaveCriticalSection(&mMessageQueueLock);
- throw;
- }
- LeaveCriticalSection(&mMessageQueueLock);
- continue;
- }
- else if(result == WAIT_TIMEOUT)
- {
- continue;
- }
- else if(result != 1)
- {
- BOX_ERROR("WaitForMultipleObjects "
- "returned invalid result " <<
- result);
- continue;
- }
-
- if(!readLine.GetLine(command))
- {
- BOX_ERROR("Failed to read line");
- continue;
- }
-
- BOX_INFO("Received command '" << command <<
- "' over command socket");
-
- bool sendOK = false;
- bool sendResponse = true;
- bool disconnect = false;
-
- // Command to process!
- if(command == "quit" || command == "")
- {
- // Close the socket.
- disconnect = true;
- sendResponse = false;
- }
- else if(command == "sync")
- {
- // Sync now!
- this->mDoSyncFlagOut = true;
- this->mSyncIsForcedOut = false;
- sendOK = true;
- SetEvent(mhCommandReceivedEvent);
- }
- else if(command == "force-sync")
- {
- // Sync now (forced -- overrides any SyncAllowScript)
- this->mDoSyncFlagOut = true;
- this->mSyncIsForcedOut = true;
- sendOK = true;
- SetEvent(mhCommandReceivedEvent);
- }
- else if(command == "reload")
- {
- // Reload the configuration
- SetReloadConfigWanted();
- sendOK = true;
- SetEvent(mhCommandReceivedEvent);
- }
- else if(command == "terminate")
- {
- // Terminate the daemon cleanly
- SetTerminateWanted();
- sendOK = true;
- SetEvent(mhCommandReceivedEvent);
- }
- else
- {
- BOX_ERROR("Received unknown command "
- "'" << command << "' "
- "over command socket");
- sendResponse = true;
- sendOK = false;
- }
-
- // Send a response back?
- if(sendResponse)
- {
- const char* response =
- sendOK ? "ok\n" : "error\n";
- rSocket.Write(response,
- strlen(response));
- }
-
- if(disconnect)
- {
- break;
- }
- }
-
- rSocket.Close();
- }
- catch(BoxException &e)
- {
- BOX_ERROR("Communication error with "
- "control client: " << e.what());
- }
- catch(std::exception &e)
- {
- BOX_ERROR("Internal error in command socket "
- "thread: " << e.what());
- }
- catch(...)
- {
- BOX_ERROR("Communication error with control client");
- }
- }
-
- CloseHandle(mhCommandReceivedEvent);
- CloseHandle(mhMessageToSendEvent);
-}
#endif
// --------------------------------------------------------------------------
@@ -611,36 +345,29 @@ void BackupDaemon::Run()
// initialise global timer mechanism
Timers::Init();
- #ifdef WIN32
- // Create a thread to handle the named pipe
- HANDLE hThread;
- unsigned int dwThreadId;
-
- hThread = (HANDLE) _beginthreadex(
- NULL, // default security attributes
- 0, // use default stack size
- HelperThread, // thread function
- this, // argument to thread function
- 0, // use default creation flags
- &dwThreadId); // returns the thread identifier
- #else
+ #ifndef WIN32
// Ignore SIGPIPE so that if a command connection is broken,
// the daemon doesn't terminate.
::signal(SIGPIPE, SIG_IGN);
+ #endif
- // Create a command socket?
- const Configuration &conf(GetConfiguration());
- if(conf.KeyExists("CommandSocket"))
- {
- // Yes, create a local UNIX socket
- mpCommandSocketInfo = new CommandSocketInfo;
- const char *socketName =
- conf.GetKeyValue("CommandSocket").c_str();
+ // Create a command socket?
+ const Configuration &conf(GetConfiguration());
+ if(conf.KeyExists("CommandSocket"))
+ {
+ // Yes, create a local UNIX socket
+ mapCommandSocketInfo.reset(new CommandSocketInfo);
+ const char *socketName =
+ conf.GetKeyValue("CommandSocket").c_str();
+ #ifdef WIN32
+ mapCommandSocketInfo->mListeningSocket.Listen(
+ socketName);
+ #else
::unlink(socketName);
- mpCommandSocketInfo->mListeningSocket.Listen(
+ mapCommandSocketInfo->mListeningSocket.Listen(
Socket::TypeUNIX, socketName);
- }
- #endif // !WIN32
+ #endif
+ }
// Handle things nicely on exceptions
try
@@ -649,16 +376,11 @@ void BackupDaemon::Run()
}
catch(...)
{
- #ifdef WIN32
- // Don't delete the socket, as the helper thread
- // is probably still using it. Let Windows clean
- // up after us.
- #else
- if(mpCommandSocketInfo != 0)
+ if(mapCommandSocketInfo.get())
{
try
{
- delete mpCommandSocketInfo;
+ mapCommandSocketInfo.reset();
}
catch(std::exception &e)
{
@@ -671,24 +393,15 @@ void BackupDaemon::Run()
BOX_WARNING("Error closing command socket "
"after exception, ignored.");
}
- mpCommandSocketInfo = 0;
}
- #endif // WIN32
Timers::Cleanup();
throw;
}
- #ifndef WIN32
- // Clean up
- if(mpCommandSocketInfo != 0)
- {
- delete mpCommandSocketInfo;
- mpCommandSocketInfo = 0;
- }
- #endif
-
+ // Clean up
+ mapCommandSocketInfo.reset();
Timers::Cleanup();
}
@@ -792,7 +505,7 @@ void BackupDaemon::Run2()
// on how this should be done,
// depending on the state of the
// control connection
- if(mpCommandSocketInfo != 0)
+ if(mapCommandSocketInfo.get() != 0)
{
// A command socket exists,
// so sleep by waiting on it
@@ -880,8 +593,7 @@ void BackupDaemon::RunSyncNowWithExceptionHandling()
}
catch(std::exception &e)
{
- BOX_ERROR("Internal error during "
- "backup run: " << e.what());
+ BOX_ERROR("Internal error during backup run: " << e.what());
errorOccurred = true;
errorString = e.what();
}
@@ -1383,33 +1095,13 @@ int BackupDaemon::UseScriptToSeeIfSyncAllowed()
// --------------------------------------------------------------------------
void BackupDaemon::WaitOnCommandSocket(box_time_t RequiredDelay, bool &DoSyncFlagOut, bool &SyncIsForcedOut)
{
-#ifdef WIN32
- DWORD requiredDelayMs = BoxTimeToMilliSeconds(RequiredDelay);
-
- DWORD result = WaitForSingleObject(mhCommandReceivedEvent,
- (DWORD)requiredDelayMs);
-
- if(result == WAIT_OBJECT_0)
- {
- DoSyncFlagOut = this->mDoSyncFlagOut;
- SyncIsForcedOut = this->mSyncIsForcedOut;
- ResetEvent(mhCommandReceivedEvent);
- }
- else if(result == WAIT_TIMEOUT)
- {
- DoSyncFlagOut = false;
- SyncIsForcedOut = false;
- }
- else
+ ASSERT(mapCommandSocketInfo.get());
+ if(!mapCommandSocketInfo.get())
{
- BOX_ERROR("Unexpected result from WaitForSingleObject: "
- "error " << GetLastError());
+ // failure case isn't too bad
+ ::sleep(1);
+ return;
}
-
- return;
-#else // ! WIN32
- ASSERT(mpCommandSocketInfo != 0);
- if(mpCommandSocketInfo == 0) {::sleep(1); return;} // failure case isn't too bad
BOX_TRACE("Wait on command socket, delay = " << RequiredDelay);
@@ -1422,12 +1114,12 @@ void BackupDaemon::WaitOnCommandSocket(box_time_t RequiredDelay, bool &DoSyncFla
if(timeout == INFTIM) timeout = 100000;
// Wait for socket connection, or handle a command?
- if(mpCommandSocketInfo->mpConnectedSocket.get() == 0)
+ if(mapCommandSocketInfo->mpConnectedSocket.get() == 0)
{
// No connection, listen for a new one
- mpCommandSocketInfo->mpConnectedSocket.reset(mpCommandSocketInfo->mListeningSocket.Accept(timeout).release());
+ mapCommandSocketInfo->mpConnectedSocket.reset(mapCommandSocketInfo->mListeningSocket.Accept(timeout).release());
- if(mpCommandSocketInfo->mpConnectedSocket.get() == 0)
+ if(mapCommandSocketInfo->mpConnectedSocket.get() == 0)
{
// If a connection didn't arrive, there was a timeout, which means we've
// waited long enough and it's time to go.
@@ -1446,7 +1138,7 @@ void BackupDaemon::WaitOnCommandSocket(box_time_t RequiredDelay, bool &DoSyncFla
{
uid_t remoteEUID = 0xffff;
gid_t remoteEGID = 0xffff;
- if(mpCommandSocketInfo->mpConnectedSocket->GetPeerCredentials(remoteEUID, remoteEGID))
+ if(mapCommandSocketInfo->mpConnectedSocket->GetPeerCredentials(remoteEUID, remoteEGID))
{
// Credentials are available -- check UID
if(remoteEUID == ::getuid())
@@ -1463,7 +1155,7 @@ void BackupDaemon::WaitOnCommandSocket(box_time_t RequiredDelay, bool &DoSyncFla
{
// Dump the connection
BOX_ERROR("Incoming command connection from peer had different user ID than this process, or security check could not be completed.");
- mpCommandSocketInfo->mpConnectedSocket.reset();
+ mapCommandSocketInfo->mpConnectedSocket.reset();
return;
}
else
@@ -1480,7 +1172,7 @@ void BackupDaemon::WaitOnCommandSocket(box_time_t RequiredDelay, bool &DoSyncFla
conf.GetKeyValueInt("MinimumFileAge"),
conf.GetKeyValueInt("MaxUploadWait"),
mState);
- mpCommandSocketInfo->mpConnectedSocket->Write(summary, summarySize);
+ mapCommandSocketInfo->mpConnectedSocket->Write(summary, summarySize);
// Set the timeout to something very small, so we don't wait too long on waiting
// for any incoming data
@@ -1490,22 +1182,22 @@ void BackupDaemon::WaitOnCommandSocket(box_time_t RequiredDelay, bool &DoSyncFla
}
// So there must be a connection now.
- ASSERT(mpCommandSocketInfo->mpConnectedSocket.get() != 0);
+ ASSERT(mapCommandSocketInfo->mpConnectedSocket.get() != 0);
// Is there a getline object ready?
- if(mpCommandSocketInfo->mpGetLine == 0)
+ if(mapCommandSocketInfo->mpGetLine == 0)
{
// Create a new one
- mpCommandSocketInfo->mpGetLine = new IOStreamGetLine(*(mpCommandSocketInfo->mpConnectedSocket.get()));
+ mapCommandSocketInfo->mpGetLine = new IOStreamGetLine(*(mapCommandSocketInfo->mpConnectedSocket.get()));
}
// Ping the remote side, to provide errors which will mean the socket gets closed
- mpCommandSocketInfo->mpConnectedSocket->Write("ping\n", 5);
+ mapCommandSocketInfo->mpConnectedSocket->Write("ping\n", 5);
// Wait for a command or something on the socket
std::string command;
- while(mpCommandSocketInfo->mpGetLine != 0 && !mpCommandSocketInfo->mpGetLine->IsEOF()
- && mpCommandSocketInfo->mpGetLine->GetLine(command, false /* no preprocessing */, timeout))
+ while(mapCommandSocketInfo->mpGetLine != 0 && !mapCommandSocketInfo->mpGetLine->IsEOF()
+ && mapCommandSocketInfo->mpGetLine->GetLine(command, false /* no preprocessing */, timeout))
{
BOX_TRACE("Receiving command '" << command
<< "' over command socket");
@@ -1550,7 +1242,7 @@ void BackupDaemon::WaitOnCommandSocket(box_time_t RequiredDelay, bool &DoSyncFla
// Send a response back?
if(sendResponse)
{
- mpCommandSocketInfo->mpConnectedSocket->Write(sendOK?"ok\n":"error\n", sendOK?3:6);
+ mapCommandSocketInfo->mpConnectedSocket->Write(sendOK?"ok\n":"error\n", sendOK?3:6);
}
// Set timeout to something very small, so this just checks for data which is waiting
@@ -1558,18 +1250,39 @@ void BackupDaemon::WaitOnCommandSocket(box_time_t RequiredDelay, bool &DoSyncFla
}
// Close on EOF?
- if(mpCommandSocketInfo->mpGetLine != 0 && mpCommandSocketInfo->mpGetLine->IsEOF())
+ if(mapCommandSocketInfo->mpGetLine != 0 && mapCommandSocketInfo->mpGetLine->IsEOF())
{
CloseCommandConnection();
}
}
+ catch(ConnectionException &ce)
+ {
+ BOX_NOTICE("Failed to write to command socket: " << ce.what());
+
+ // If an error occurs, and there is a connection active,
+ // just close that connection and continue. Otherwise,
+ // let the error propagate.
+
+ if(mapCommandSocketInfo->mpConnectedSocket.get() == 0)
+ {
+ throw; // thread will die
+ }
+ else
+ {
+ // Close socket and ignore error
+ CloseCommandConnection();
+ }
+ }
catch(std::exception &e)
{
- BOX_ERROR("Internal error in command socket thread: "
- << e.what());
- // If an error occurs, and there is a connection active, just close that
- // connection and continue. Otherwise, let the error propagate.
- if(mpCommandSocketInfo->mpConnectedSocket.get() == 0)
+ BOX_ERROR("Failed to write to command socket: " <<
+ e.what());
+
+ // If an error occurs, and there is a connection active,
+ // just close that connection and continue. Otherwise,
+ // let the error propagate.
+
+ if(mapCommandSocketInfo->mpConnectedSocket.get() == 0)
{
throw; // thread will die
}
@@ -1581,9 +1294,13 @@ void BackupDaemon::WaitOnCommandSocket(box_time_t RequiredDelay, bool &DoSyncFla
}
catch(...)
{
- // If an error occurs, and there is a connection active, just close that
- // connection and continue. Otherwise, let the error propagate.
- if(mpCommandSocketInfo->mpConnectedSocket.get() == 0)
+ BOX_ERROR("Failed to write to command socket: unknown error");
+
+ // If an error occurs, and there is a connection active,
+ // just close that connection and continue. Otherwise,
+ // let the error propagate.
+
+ if(mapCommandSocketInfo->mpConnectedSocket.get() == 0)
{
throw; // thread will die
}
@@ -1593,7 +1310,6 @@ void BackupDaemon::WaitOnCommandSocket(box_time_t RequiredDelay, bool &DoSyncFla
CloseCommandConnection();
}
}
-#endif // WIN32
}
@@ -1607,17 +1323,16 @@ void BackupDaemon::WaitOnCommandSocket(box_time_t RequiredDelay, bool &DoSyncFla
// --------------------------------------------------------------------------
void BackupDaemon::CloseCommandConnection()
{
-#ifndef WIN32
try
{
BOX_TRACE("Closing command connection");
- if(mpCommandSocketInfo->mpGetLine)
+ if(mapCommandSocketInfo->mpGetLine)
{
- delete mpCommandSocketInfo->mpGetLine;
- mpCommandSocketInfo->mpGetLine = 0;
+ delete mapCommandSocketInfo->mpGetLine;
+ mapCommandSocketInfo->mpGetLine = 0;
}
- mpCommandSocketInfo->mpConnectedSocket.reset();
+ mapCommandSocketInfo->mpConnectedSocket.reset();
}
catch(std::exception &e)
{
@@ -1628,7 +1343,6 @@ void BackupDaemon::CloseCommandConnection()
{
// Ignore any errors
}
-#endif
}
@@ -1646,27 +1360,15 @@ void BackupDaemon::SendSyncStartOrFinish(bool SendStart)
// The bbackupctl program can't rely on a state change, because it
// may never change if the server doesn't need to be contacted.
- if(mpCommandSocketInfo != NULL &&
-#ifdef WIN32
- mpCommandSocketInfo->mListeningSocket.IsConnected()
-#else
- mpCommandSocketInfo->mpConnectedSocket.get() != 0
-#endif
- )
+ if(mapCommandSocketInfo.get() &&
+ mapCommandSocketInfo->mpConnectedSocket.get() != 0)
{
std::string message = SendStart ? "start-sync" : "finish-sync";
try
{
-#ifdef WIN32
- EnterCriticalSection(&mMessageQueueLock);
- mMessageList.push_back(message);
- SetEvent(mhMessageToSendEvent);
- LeaveCriticalSection(&mMessageQueueLock);
-#else
message += "\n";
- mpCommandSocketInfo->mpConnectedSocket->Write(
+ mapCommandSocketInfo->mpConnectedSocket->Write(
message.c_str(), message.size());
-#endif
}
catch(std::exception &e)
{
@@ -2354,20 +2056,14 @@ void BackupDaemon::SetState(int State)
sprintf(newState, "state %d", State);
std::string message = newState;
-#ifdef WIN32
- EnterCriticalSection(&mMessageQueueLock);
- mMessageList.push_back(newState);
- SetEvent(mhMessageToSendEvent);
- LeaveCriticalSection(&mMessageQueueLock);
-#else
message += "\n";
- if(mpCommandSocketInfo == 0)
+ if(!mapCommandSocketInfo.get())
{
return;
}
- if(mpCommandSocketInfo->mpConnectedSocket.get() == 0)
+ if(mapCommandSocketInfo->mpConnectedSocket.get() == 0)
{
return;
}
@@ -2375,22 +2071,27 @@ void BackupDaemon::SetState(int State)
// Something connected to the command socket, tell it about the new state
try
{
- mpCommandSocketInfo->mpConnectedSocket->Write(message.c_str(),
+ mapCommandSocketInfo->mpConnectedSocket->Write(message.c_str(),
message.length());
}
+ catch(ConnectionException &ce)
+ {
+ BOX_NOTICE("Failed to write state to command socket: " <<
+ ce.what());
+ CloseCommandConnection();
+ }
catch(std::exception &e)
{
- BOX_ERROR("Internal error while writing state "
- "to command socket: " << e.what());
+ BOX_ERROR("Failed to write state to command socket: " <<
+ e.what());
CloseCommandConnection();
}
catch(...)
{
- BOX_ERROR("Internal error while writing state "
- "to command socket: unknown error");
+ BOX_ERROR("Failed to write state to command socket: "
+ "unknown error");
CloseCommandConnection();
}
-#endif
}
diff --git a/bin/bbackupd/BackupDaemon.h b/bin/bbackupd/BackupDaemon.h
index 0f946164..0e49abcd 100644
--- a/bin/bbackupd/BackupDaemon.h
+++ b/bin/bbackupd/BackupDaemon.h
@@ -27,6 +27,7 @@
#include "autogen_BackupProtocolClient.h"
#ifdef WIN32
+ #include "WinNamedPipeListener.h"
#include "WinNamedPipeStream.h"
#endif
@@ -193,7 +194,8 @@ private:
CommandSocketInfo &operator=(const CommandSocketInfo &);
public:
#ifdef WIN32
- WinNamedPipeStream mListeningSocket;
+ WinNamedPipeListener<1 /* listen backlog */> mListeningSocket;
+ std::auto_ptr<WinNamedPipeStream> mpConnectedSocket;
#else
SocketListen<SocketStream, 1 /* listen backlog */> mListeningSocket;
std::auto_ptr<SocketStream> mpConnectedSocket;
@@ -202,7 +204,7 @@ private:
};
// Using a socket?
- CommandSocketInfo *mpCommandSocketInfo;
+ std::auto_ptr<CommandSocketInfo> mapCommandSocketInfo;
// Stop notifications being repeated.
SysadminNotifier::EventCode mLastNotifiedEvent;
@@ -503,16 +505,9 @@ public:
}
#ifdef WIN32
- public:
- void RunHelperThread(void);
-
private:
- bool mDoSyncFlagOut, mSyncIsForcedOut;
bool mInstallService, mRemoveService, mRunAsService;
std::string mServiceName;
- HANDLE mhMessageToSendEvent, mhCommandReceivedEvent;
- CRITICAL_SECTION mMessageQueueLock;
- std::vector<std::string> mMessageList;
#endif
};
diff --git a/lib/server/WinNamedPipeStream.cpp b/lib/server/WinNamedPipeStream.cpp
index b3829d6b..1179516e 100644
--- a/lib/server/WinNamedPipeStream.cpp
+++ b/lib/server/WinNamedPipeStream.cpp
@@ -44,7 +44,55 @@ WinNamedPipeStream::WinNamedPipeStream()
mWriteClosed(false),
mIsServer(false),
mIsConnected(false)
-{
+{ }
+
+// --------------------------------------------------------------------------
+//
+// Function
+// Name: WinNamedPipeStream::WinNamedPipeStream(HANDLE)
+// Purpose: Constructor (with already-connected pipe handle)
+// Created: 2008/10/01
+//
+// --------------------------------------------------------------------------
+WinNamedPipeStream::WinNamedPipeStream(HANDLE hNamedPipe)
+ : mSocketHandle(hNamedPipe),
+ mReadableEvent(INVALID_HANDLE_VALUE),
+ mBytesInBuffer(0),
+ mReadClosed(false),
+ mWriteClosed(false),
+ mIsServer(true),
+ mIsConnected(true)
+{
+ // create the Readable event
+ mReadableEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+
+ if (mReadableEvent == INVALID_HANDLE_VALUE)
+ {
+ BOX_ERROR("Failed to create the Readable event: " <<
+ GetErrorMessage(GetLastError()));
+ Close();
+ THROW_EXCEPTION(CommonException, Internal)
+ }
+
+ // initialise the OVERLAPPED structure
+ memset(&mReadOverlap, 0, sizeof(mReadOverlap));
+ mReadOverlap.hEvent = mReadableEvent;
+
+ // start the first overlapped read
+ if (!ReadFile(mSocketHandle, mReadBuffer, sizeof(mReadBuffer),
+ NULL, &mReadOverlap))
+ {
+ DWORD err = GetLastError();
+
+ if (err != ERROR_IO_PENDING)
+ {
+ BOX_ERROR("Failed to start overlapped read: " <<
+ GetErrorMessage(err));
+ Close();
+ THROW_EXCEPTION(ConnectionException,
+ Conn_SocketReadError)
+ }
+ }
}
// --------------------------------------------------------------------------
@@ -80,33 +128,17 @@ WinNamedPipeStream::~WinNamedPipeStream()
// Created: 2005/12/07
//
// --------------------------------------------------------------------------
-void WinNamedPipeStream::Accept(const std::string& rName)
+/*
+void WinNamedPipeStream::Accept()
{
- if (mSocketHandle != INVALID_HANDLE_VALUE || mIsConnected)
+ if (mSocketHandle == INVALID_HANDLE_VALUE)
{
- THROW_EXCEPTION(ServerException, SocketAlreadyOpen)
+ THROW_EXCEPTION(ServerException, BadSocketHandle);
}
- std::string socket = sPipeNamePrefix + rName;
-
- mSocketHandle = CreateNamedPipeA(
- socket.c_str(), // pipe name
- PIPE_ACCESS_DUPLEX | // read/write access
- FILE_FLAG_OVERLAPPED, // enabled overlapped I/O
- PIPE_TYPE_BYTE | // message type pipe
- PIPE_READMODE_BYTE | // message-read mode
- PIPE_WAIT, // blocking mode
- 1, // max. instances
- 4096, // output buffer size
- 4096, // input buffer size
- NMPWAIT_USE_DEFAULT_WAIT, // client time-out
- NULL); // default security attribute
-
- if (mSocketHandle == INVALID_HANDLE_VALUE)
+ if (mIsConnected)
{
- BOX_ERROR("Failed to CreateNamedPipeA(" << socket << "): " <<
- GetErrorMessage(GetLastError()));
- THROW_EXCEPTION(ServerException, SocketOpenError)
+ THROW_EXCEPTION(ServerException, SocketAlreadyOpen);
}
bool connected = ConnectNamedPipe(mSocketHandle, (LPOVERLAPPED) NULL);
@@ -156,6 +188,7 @@ void WinNamedPipeStream::Accept(const std::string& rName)
}
}
}
+*/
// --------------------------------------------------------------------------
//
@@ -217,7 +250,7 @@ void WinNamedPipeStream::Connect(const std::string& rName)
int WinNamedPipeStream::Read(void *pBuffer, int NBytes, int Timeout)
{
// TODO no support for timeouts yet
- if (Timeout != IOStream::TimeOutInfinite)
+ if (!mIsServer && Timeout != IOStream::TimeOutInfinite)
{
THROW_EXCEPTION(CommonException, AssertFailed)
}
@@ -249,8 +282,29 @@ int WinNamedPipeStream::Read(void *pBuffer, int NBytes, int Timeout)
{
// overlapped I/O completed successfully?
// (wait if needed)
+ DWORD waitResult = WaitForSingleObject(
+ mReadOverlap.hEvent, Timeout);
- if (GetOverlappedResult(mSocketHandle,
+ if (waitResult == WAIT_ABANDONED)
+ {
+ BOX_ERROR("Wait for command socket read "
+ "abandoned by system");
+ THROW_EXCEPTION(ServerException,
+ BadSocketHandle);
+ }
+ else if (waitResult == WAIT_TIMEOUT)
+ {
+ // wait timed out, nothing to read
+ NumBytesRead = 0;
+ }
+ else if (waitResult != WAIT_OBJECT_0)
+ {
+ BOX_ERROR("Failed to wait for command "
+ "socket read: unknown result " <<
+ waitResult);
+ }
+ // object is ready to read from
+ else if (GetOverlappedResult(mSocketHandle,
&mReadOverlap, &NumBytesRead, TRUE))
{
needAnotherRead = true;
@@ -267,7 +321,7 @@ int WinNamedPipeStream::Read(void *pBuffer, int NBytes, int Timeout)
{
if (err == ERROR_BROKEN_PIPE)
{
- BOX_ERROR("Control client "
+ BOX_NOTICE("Control client "
"disconnected");
}
else
@@ -342,29 +396,6 @@ int WinNamedPipeStream::Read(void *pBuffer, int NBytes, int Timeout)
Conn_SocketReadError)
}
}
-
- // If the read succeeded immediately, leave the event
- // signaled, so that we will be called again to process
- // the newly read data and start another overlapped read.
- if (needAnotherRead && !mReadClosed)
- {
- // leave signalled
- }
- else if (!needAnotherRead && mBytesInBuffer > 0)
- {
- // leave signalled
- }
- else
- {
- // nothing left to read, reset the event
- ResetEvent(mReadableEvent);
- // FIXME: a pending read could have signalled
- // the event (again) while we were busy reading.
- // that signal would be lost, and the reading
- // thread would block. Should be pretty obvious
- // if this happens in practice: control client
- // hangs.
- }
}
else
{
@@ -441,7 +472,7 @@ void WinNamedPipeStream::Write(const void *pBuffer, int NBytes)
if (!Success)
{
// ERROR_NO_DATA is a strange name for
- // "The pipe is being closed". No exception wanted.
+ // "The pipe is being closed".
DWORD err = GetLastError();
@@ -453,15 +484,8 @@ void WinNamedPipeStream::Write(const void *pBuffer, int NBytes)
Close();
- if (err == ERROR_NO_DATA)
- {
- return;
- }
- else
- {
- THROW_EXCEPTION(ConnectionException,
- Conn_SocketWriteError)
- }
+ THROW_EXCEPTION(ConnectionException,
+ Conn_SocketWriteError)
}
NumBytesWrittenTotal += NumBytesWrittenThisTime;
diff --git a/lib/server/WinNamedPipeStream.h b/lib/server/WinNamedPipeStream.h
index 6acd48f6..386ff7e3 100644
--- a/lib/server/WinNamedPipeStream.h
+++ b/lib/server/WinNamedPipeStream.h
@@ -24,10 +24,11 @@ class WinNamedPipeStream : public IOStream
{
public:
WinNamedPipeStream();
+ WinNamedPipeStream(HANDLE hNamedPipe);
~WinNamedPipeStream();
// server side - create the named pipe and listen for connections
- void Accept(const std::string& rName);
+ // use WinNamedPipeListener to do this instead.
// client side - connect to a waiting server
void Connect(const std::string& rName);
@@ -40,9 +41,6 @@ public:
virtual void Close();
virtual bool StreamDataLeft();
virtual bool StreamClosed();
- bool IsConnected() { return mIsConnected; }
- HANDLE GetSocketHandle() { return mSocketHandle; }
- HANDLE GetReadableEvent() { return mReadableEvent; }
protected:
void MarkAsReadClosed() {mReadClosed = true;}
@@ -62,6 +60,7 @@ private:
bool mIsServer;
bool mIsConnected;
+public:
static std::string sPipeNamePrefix;
};