summaryrefslogtreecommitdiff
path: root/bin/bbackupd/BackupDaemon.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'bin/bbackupd/BackupDaemon.cpp')
-rw-r--r--bin/bbackupd/BackupDaemon.cpp219
1 files changed, 149 insertions, 70 deletions
diff --git a/bin/bbackupd/BackupDaemon.cpp b/bin/bbackupd/BackupDaemon.cpp
index 370535fa..9789558f 100644
--- a/bin/bbackupd/BackupDaemon.cpp
+++ b/bin/bbackupd/BackupDaemon.cpp
@@ -127,6 +127,29 @@ BackupDaemon::BackupDaemon()
}
#ifdef WIN32
+ // Create the event object to signal from main thread to worker
+ // when new messages are queued to be sent to the command socket.
+ mhMessageToSendEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+ if(mhMessageToSendEvent == INVALID_HANDLE_VALUE)
+ {
+ BOX_ERROR("Failed to create event object: error " <<
+ GetLastError());
+ exit(1);
+ }
+
+ // Create the event object to signal from worker to main thread
+ // when a command has been received on the command socket.
+ mhCommandReceivedEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+ if(mhCommandReceivedEvent == INVALID_HANDLE_VALUE)
+ {
+ BOX_ERROR("Failed to create event object: error " <<
+ GetLastError());
+ exit(1);
+ }
+
+ // Create the critical section to protect the message queue
+ InitializeCriticalSection(&mMessageQueueLock);
+
// Create a thread to handle the named pipe
HANDLE hThread;
unsigned int dwThreadId;
@@ -264,7 +287,6 @@ void BackupDaemon::DeleteAllLocations()
#ifdef WIN32
void BackupDaemon::RunHelperThread(void)
{
- this->mReceivedCommandConn = false;
mpCommandSocketInfo = new CommandSocketInfo;
WinNamedPipeStream& rSocket(mpCommandSocketInfo->mListeningSocket);
@@ -322,16 +344,74 @@ void BackupDaemon::RunHelperThread(void)
rSocket.Write(summary, summarySize);
rSocket.Write("ping\n", 5);
+ // old queued messages are not useful
+ EnterCriticalSection(&mMessageQueueLock);
+ mMessageList.clear();
+ ResetEvent(mhMessageToSendEvent);
+ LeaveCriticalSection(&mMessageQueueLock);
+
IOStreamGetLine readLine(rSocket);
std::string command;
- while (rSocket.IsConnected() &&
- readLine.GetLine(command) &&
- !IsTerminateWanted())
+ while (rSocket.IsConnected() && !IsTerminateWanted())
{
+ HANDLE handles[2];
+ handles[0] = mhMessageToSendEvent;
+ handles[1] = rSocket.GetReadableEvent();
+
BOX_TRACE("Received command '" << command
<< "' over command socket");
+ DWORD result = WaitForMultipleObjects(
+ sizeof(handles)/sizeof(*handles),
+ handles, FALSE, 1000);
+
+ if(result == 0)
+ {
+ ResetEvent(mhMessageToSendEvent);
+
+ EnterCriticalSection(&mMessageQueueLock);
+ try
+ {
+ while (mMessageList.size() > 0)
+ {
+ std::string message = *(mMessageList.begin());
+ mMessageList.erase(mMessageList.begin());
+ printf("Sending '%s' to waiting client... ", message.c_str());
+ message += "\n";
+ rSocket.Write(message.c_str(),
+ message.length());
+
+ printf("done.\n");
+ }
+ }
+ catch (...)
+ {
+ LeaveCriticalSection(&mMessageQueueLock);
+ throw;
+ }
+ LeaveCriticalSection(&mMessageQueueLock);
+ continue;
+ }
+ else if(result == WAIT_TIMEOUT)
+ {
+ continue;
+ }
+ else if(result != 1)
+ {
+ BOX_ERROR("WaitForMultipleObjects returned invalid result " << result);
+ continue;
+ }
+
+ if(!readLine.GetLine(command))
+ {
+ BOX_ERROR("Failed to read line");
+ continue;
+ }
+
+ BOX_INFO("Received command " << command <<
+ " from client");
+
bool sendOK = false;
bool sendResponse = true;
bool disconnect = false;
@@ -349,6 +429,7 @@ void BackupDaemon::RunHelperThread(void)
this->mDoSyncFlagOut = true;
this->mSyncIsForcedOut = false;
sendOK = true;
+ SetEvent(mhCommandReceivedEvent);
}
else if(command == "force-sync")
{
@@ -356,18 +437,21 @@ void BackupDaemon::RunHelperThread(void)
this->mDoSyncFlagOut = true;
this->mSyncIsForcedOut = true;
sendOK = true;
+ SetEvent(mhCommandReceivedEvent);
}
else if(command == "reload")
{
// Reload the configuration
SetReloadConfigWanted();
sendOK = true;
+ SetEvent(mhCommandReceivedEvent);
}
else if(command == "terminate")
{
// Terminate the daemon cleanly
SetTerminateWanted();
sendOK = true;
+ SetEvent(mhCommandReceivedEvent);
}
else
{
@@ -390,8 +474,6 @@ void BackupDaemon::RunHelperThread(void)
{
break;
}
-
- this->mReceivedCommandConn = true;
}
rSocket.Close();
@@ -411,6 +493,9 @@ void BackupDaemon::RunHelperThread(void)
BOX_ERROR("Communication error with control client");
}
}
+
+ CloseHandle(mhCommandReceivedEvent);
+ CloseHandle(mhMessageToSendEvent);
}
#endif
@@ -1123,25 +1208,27 @@ int BackupDaemon::UseScriptToSeeIfSyncAllowed()
void BackupDaemon::WaitOnCommandSocket(box_time_t RequiredDelay, bool &DoSyncFlagOut, bool &SyncIsForcedOut)
{
#ifdef WIN32
- // Really could use some interprocess protection, mutex etc
- // any side effect should be too bad???? :)
- DWORD timeout = (DWORD)BoxTimeToMilliSeconds(RequiredDelay);
+ DWORD requiredDelayMs = BoxTimeToMilliSeconds(RequiredDelay);
- while ( this->mReceivedCommandConn == false )
- {
- Sleep(1);
+ DWORD result = WaitForSingleObject(mhCommandReceivedEvent,
+ (DWORD)requiredDelayMs);
- if( timeout == 0 )
- {
- DoSyncFlagOut = false;
- SyncIsForcedOut = false;
- return;
- }
- timeout--;
+ if(result == WAIT_OBJECT_0)
+ {
+ DoSyncFlagOut = this->mDoSyncFlagOut;
+ SyncIsForcedOut = this->mSyncIsForcedOut;
+ ResetEvent(mhCommandReceivedEvent);
+ }
+ else if(result == WAIT_TIMEOUT)
+ {
+ DoSyncFlagOut = false;
+ SyncIsForcedOut = false;
+ }
+ else
+ {
+ BOX_ERROR("Unexpected result from WaitForSingleObject: "
+ "error " << GetLastError());
}
- this->mReceivedCommandConn = false;
- DoSyncFlagOut = this->mDoSyncFlagOut;
- SyncIsForcedOut = this->mSyncIsForcedOut;
return;
#else // ! WIN32
@@ -1383,10 +1470,6 @@ void BackupDaemon::SendSyncStartOrFinish(bool SendStart)
// The bbackupctl program can't rely on a state change, because it
// may never change if the server doesn't need to be contacted.
-#ifdef __MINGW32__
-#warning race condition: what happens if socket is closed?
-#endif
-
if(mpCommandSocketInfo != NULL &&
#ifdef WIN32
mpCommandSocketInfo->mListeningSocket.IsConnected()
@@ -1395,15 +1478,18 @@ void BackupDaemon::SendSyncStartOrFinish(bool SendStart)
#endif
)
{
- const char* message = SendStart ? "start-sync\n" : "finish-sync\n";
+ std::string message = SendStart ? "start-sync" : "finish-sync";
try
{
#ifdef WIN32
- mpCommandSocketInfo->mListeningSocket.Write(message,
- (int)strlen(message));
+ EnterCriticalSection(&mMessageQueueLock);
+ mMessageList.push_back(message);
+ SetEvent(mhMessageToSendEvent);
+ LeaveCriticalSection(&mMessageQueueLock);
#else
- mpCommandSocketInfo->mpConnectedSocket->Write(message,
- strlen(message));
+ message += "\n";
+ mpCommandSocketInfo->mpConnectedSocket->Write(
+ message.c_str(), message.size());
#endif
}
catch(std::exception &e)
@@ -2033,51 +2119,44 @@ void BackupDaemon::SetState(int State)
// command socket if there's an error
char newState[64];
- char newStateSize = sprintf(newState, "state %d\n", State);
+ sprintf(newState, "state %d", State);
+ std::string message = newState;
#ifdef WIN32
- #ifndef _MSC_VER
- #warning FIX ME: race condition
- #endif
+ EnterCriticalSection(&mMessageQueueLock);
+ mMessageList.push_back(newState);
+ SetEvent(mhMessageToSendEvent);
+ LeaveCriticalSection(&mMessageQueueLock);
+#else
+ message += "\n";
- // what happens if the socket is closed by the other thread before
- // we can write to it? Null pointer deref at best.
- if(mpCommandSocketInfo &&
- mpCommandSocketInfo->mListeningSocket.IsConnected())
+ if(mpCommandSocketInfo == 0)
{
- try
- {
- mpCommandSocketInfo->mListeningSocket.Write(newState, newStateSize);
- }
- catch(std::exception &e)
- {
- BOX_ERROR("Internal error while writing state "
- "to command socket: " << e.what());
- CloseCommandConnection();
- }
- catch(...)
- {
- CloseCommandConnection();
- }
+ return;
}
-#else
- if(mpCommandSocketInfo != 0 && mpCommandSocketInfo->mpConnectedSocket.get() != 0)
+
+ if(mpCommandSocketInfo->mpConnectedSocket.get() == 0)
{
- // Something connected to the command socket, tell it about the new state
- try
- {
- mpCommandSocketInfo->mpConnectedSocket->Write(newState, newStateSize);
- }
- catch(std::exception &e)
- {
- BOX_ERROR("Internal error while writing state "
- "to command socket: " << e.what());
- CloseCommandConnection();
- }
- catch(...)
- {
- CloseCommandConnection();
- }
+ return;
+ }
+
+ // Something connected to the command socket, tell it about the new state
+ try
+ {
+ mpCommandSocketInfo->mpConnectedSocket->Write(message.c_str(),
+ message.length());
+ }
+ catch(std::exception &e)
+ {
+ BOX_ERROR("Internal error while writing state "
+ "to command socket: " << e.what());
+ CloseCommandConnection();
+ }
+ catch(...)
+ {
+ BOX_ERROR("Internal error while writing state "
+ "to command socket: unknown error");
+ CloseCommandConnection();
}
#endif
}