summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Wilson <chris+github@qwirx.com>2012-02-12 12:29:56 +0000
committerChris Wilson <chris+github@qwirx.com>2012-02-12 12:29:56 +0000
commit15a3e05144785bfcaddcf2d11b6c549edd9f62ee (patch)
tree30827463c74a3b24f4b206894580a4511203c014
parent03d7cb4d75822b40d263d0737e3115a0aeff1cda (diff)
Add experimental "TCP Nice" mode, disabled by default.
-rw-r--r--bin/bbackupd/BackupClientContext.cpp118
-rw-r--r--bin/bbackupd/BackupClientContext.h18
-rw-r--r--bin/bbackupd/BackupClientDirectoryRecord.cpp7
-rw-r--r--bin/bbackupd/BackupDaemon.cpp4
-rw-r--r--infrastructure/m4/boxbackup_tests.m46
-rw-r--r--lib/backupclient/BackupDaemonConfigVerify.cpp3
-rw-r--r--lib/backupstore/BackupStoreFileDiff.cpp2
-rw-r--r--lib/common/BoxTime.h4
-rw-r--r--lib/common/Timer.cpp38
-rw-r--r--lib/common/Timer.h4
-rw-r--r--lib/server/SocketStream.h6
-rw-r--r--lib/server/TcpNice.cpp204
-rw-r--r--lib/server/TcpNice.h174
13 files changed, 498 insertions, 90 deletions
diff --git a/bin/bbackupd/BackupClientContext.cpp b/bin/bbackupd/BackupClientContext.cpp
index 7602ed29..e1c4349f 100644
--- a/bin/bbackupd/BackupClientContext.cpp
+++ b/bin/bbackupd/BackupClientContext.cpp
@@ -28,6 +28,7 @@
#include "autogen_BackupProtocol.h"
#include "BackupStoreFile.h"
#include "Logging.h"
+#include "TcpNice.h"
#include "MemLeakFindOn.h"
@@ -49,15 +50,14 @@ BackupClientContext::BackupClientContext
bool ExtendedLogging,
bool ExtendedLogToFile,
std::string ExtendedLogFile,
- ProgressNotifier& rProgressNotifier
+ ProgressNotifier& rProgressNotifier,
+ bool TcpNiceMode
)
: mrResolver(rResolver),
mrTLSContext(rTLSContext),
mHostname(rHostname),
mPort(Port),
mAccountNumber(AccountNumber),
- mpSocket(0),
- mpConnection(0),
mExtendedLogging(ExtendedLogging),
mExtendedLogToFile(ExtendedLogToFile),
mExtendedLogFile(ExtendedLogFile),
@@ -71,7 +71,8 @@ BackupClientContext::BackupClientContext
mpExcludeDirs(0),
mKeepAliveTimer(0, "KeepAliveTime"),
mbIsManaged(false),
- mrProgressNotifier(rProgressNotifier)
+ mrProgressNotifier(rProgressNotifier),
+ mTcpNiceMode(TcpNiceMode)
{
}
@@ -107,40 +108,42 @@ BackupClientContext::~BackupClientContext()
BackupProtocolClient &BackupClientContext::GetConnection()
{
// Already got it? Just return it.
- if(mpConnection != 0)
+ if(mapConnection.get())
{
- return *mpConnection;
+ return *mapConnection;
}
- // Get a socket connection
- if(mpSocket == 0)
- {
- mpSocket = new SocketStreamTLS;
- ASSERT(mpSocket != 0); // will have exceptioned if this was a problem
- }
+ // there shouldn't be a connection open
+ ASSERT(mapSocket.get() == 0);
+
+ SocketStreamTLS *pSocket = new SocketStreamTLS;
try
{
// Defensive.
- if(mpConnection != 0)
- {
- delete mpConnection;
- mpConnection = 0;
- }
+ mapConnection.reset();
// Log intention
BOX_INFO("Opening connection to server '" <<
mHostname << "'...");
// Connect!
- mpSocket->Open(mrTLSContext, Socket::TypeINET,
- mHostname.c_str(), mPort);
+ pSocket->Open(mrTLSContext, Socket::TypeINET, mHostname.c_str(), mPort);
+
+ if(mTcpNiceMode)
+ {
+ mapNice.reset(new NiceSocketStream(std::auto_ptr<SocketStream>(pSocket)));
+ mapConnection.reset(new BackupProtocolClient(*mapNice));
+ }
+ else
+ {
+ mapConnection.reset(new BackupProtocolClient(*pSocket));
+ }
- // And create a procotol object
- mpConnection = new BackupProtocolClient(*mpSocket);
+ pSocket = NULL;
// Set logging option
- mpConnection->SetLogToSysLog(mExtendedLogging);
+ mapConnection->SetLogToSysLog(mExtendedLogging);
if (mExtendedLogToFile)
{
@@ -156,16 +159,17 @@ BackupProtocolClient &BackupClientContext::GetConnection()
}
else
{
- mpConnection->SetLogToFile(mpExtendedLogFileHandle);
+ mapConnection->SetLogToFile(mpExtendedLogFileHandle);
}
}
// Handshake
- mpConnection->Handshake();
+ mapConnection->Handshake();
// Check the version of the server
{
- std::auto_ptr<BackupProtocolVersion> serverVersion(mpConnection->QueryVersion(BACKUP_STORE_SERVER_VERSION));
+ std::auto_ptr<BackupProtocolVersion> serverVersion(
+ mapConnection->QueryVersion(BACKUP_STORE_SERVER_VERSION));
if(serverVersion->GetVersion() != BACKUP_STORE_SERVER_VERSION)
{
THROW_EXCEPTION(BackupStoreException, WrongServerVersion)
@@ -173,7 +177,8 @@ BackupProtocolClient &BackupClientContext::GetConnection()
}
// Login -- if this fails, the Protocol will exception
- std::auto_ptr<BackupProtocolLoginConfirmed> loginConf(mpConnection->QueryLogin(mAccountNumber, 0 /* read/write */));
+ std::auto_ptr<BackupProtocolLoginConfirmed> loginConf(
+ mapConnection->QueryLogin(mAccountNumber, 0 /* read/write */));
// Check that the client store marker is the one we expect
if(mClientStoreMarker != ClientStoreMarker_NotKnown)
@@ -183,9 +188,9 @@ BackupProtocolClient &BackupClientContext::GetConnection()
// Not good... finish the connection, abort, etc, ignoring errors
try
{
- mpConnection->QueryFinished();
- mpSocket->Shutdown();
- mpSocket->Close();
+ mapConnection->QueryFinished();
+ mapSocket.reset();
+ mapNice.reset();
}
catch(...)
{
@@ -213,14 +218,13 @@ BackupProtocolClient &BackupClientContext::GetConnection()
catch(...)
{
// Clean up.
- delete mpConnection;
- mpConnection = 0;
- delete mpSocket;
- mpSocket = 0;
+ mapConnection.reset();
+ mapSocket.reset();
+ mapNice.reset();
throw;
}
- return *mpConnection;
+ return *mapConnection;
}
// --------------------------------------------------------------------------
@@ -233,7 +237,7 @@ BackupProtocolClient &BackupClientContext::GetConnection()
// --------------------------------------------------------------------------
void BackupClientContext::CloseAnyOpenConnection()
{
- if(mpConnection)
+ if(mapConnection.get())
{
try
{
@@ -244,14 +248,14 @@ void BackupClientContext::CloseAnyOpenConnection()
box_time_t marker = GetCurrentBoxTime();
// Set it on the store
- mpConnection->QuerySetClientStoreMarker(marker);
+ mapConnection->QuerySetClientStoreMarker(marker);
// Record it so that it can be picked up later.
mClientStoreMarker = marker;
}
// Quit nicely
- mpConnection->QueryFinished();
+ mapConnection->QueryFinished();
}
catch(...)
{
@@ -259,26 +263,18 @@ void BackupClientContext::CloseAnyOpenConnection()
}
// Delete it anyway.
- delete mpConnection;
- mpConnection = 0;
+ mapConnection.reset();
}
-
- if(mpSocket)
+
+ try
{
- try
- {
- // Be nice about closing the socket
- mpSocket->Shutdown();
- mpSocket->Close();
- }
- catch(...)
- {
- // Ignore errors
- }
-
- // Delete object
- delete mpSocket;
- mpSocket = 0;
+ // Be nice about closing the socket
+ mapSocket.reset();
+ mapNice.reset();
+ }
+ catch(...)
+ {
+ // Ignore errors
}
// Delete any pending list
@@ -307,9 +303,9 @@ void BackupClientContext::CloseAnyOpenConnection()
// --------------------------------------------------------------------------
int BackupClientContext::GetTimeout() const
{
- if(mpConnection)
+ if(mapConnection.get())
{
- return mpConnection->GetTimeout();
+ return mapConnection->GetTimeout();
}
return (15*60*1000);
@@ -509,7 +505,7 @@ void BackupClientContext::SetKeepAliveTime(int iSeconds)
{
mKeepAliveTime = iSeconds < 0 ? 0 : iSeconds;
BOX_TRACE("Set keep-alive time to " << mKeepAliveTime << " seconds");
- mKeepAliveTimer = Timer(mKeepAliveTime, "KeepAliveTime");
+ mKeepAliveTimer = Timer(mKeepAliveTime * 1000, "KeepAliveTime");
}
// --------------------------------------------------------------------------
@@ -551,7 +547,7 @@ void BackupClientContext::UnManageDiffProcess()
// --------------------------------------------------------------------------
void BackupClientContext::DoKeepAlive()
{
- if (!mpConnection)
+ if (!mapConnection.get())
{
return;
}
@@ -567,9 +563,9 @@ void BackupClientContext::DoKeepAlive()
}
BOX_TRACE("KeepAliveTime reached, sending keep-alive message");
- mpConnection->QueryGetIsAlive();
+ mapConnection->QueryGetIsAlive();
- mKeepAliveTimer = Timer(mKeepAliveTime, "KeepAliveTime");
+ mKeepAliveTimer = Timer(mKeepAliveTime * 1000, "KeepAliveTime");
}
int BackupClientContext::GetMaximumDiffingTime()
diff --git a/bin/bbackupd/BackupClientContext.h b/bin/bbackupd/BackupClientContext.h
index 404d2d77..41d90ff0 100644
--- a/bin/bbackupd/BackupClientContext.h
+++ b/bin/bbackupd/BackupClientContext.h
@@ -16,6 +16,7 @@
#include "BackupDaemonInterface.h"
#include "BackupStoreFile.h"
#include "ExcludeList.h"
+#include "TcpNice.h"
#include "Timer.h"
class TLSContext;
@@ -49,7 +50,8 @@ public:
bool ExtendedLogging,
bool ExtendedLogToFile,
std::string ExtendedLogFile,
- ProgressNotifier &rProgressNotifier
+ ProgressNotifier &rProgressNotifier,
+ bool TcpNiceMode
);
virtual ~BackupClientContext();
private:
@@ -207,6 +209,14 @@ public:
{
return mrProgressNotifier;
}
+
+ void SetNiceMode(bool enabled)
+ {
+ if(mTcpNiceMode)
+ {
+ mapNice->SetEnabled(enabled);
+ }
+ }
private:
LocationResolver &mrResolver;
@@ -214,8 +224,9 @@ private:
std::string mHostname;
int mPort;
uint32_t mAccountNumber;
- SocketStreamTLS *mpSocket;
- BackupProtocolClient *mpConnection;
+ std::auto_ptr<IOStream> mapSocket;
+ std::auto_ptr<NiceSocketStream> mapNice;
+ std::auto_ptr<BackupProtocolClient> mapConnection;
bool mExtendedLogging;
bool mExtendedLogToFile;
std::string mExtendedLogFile;
@@ -232,6 +243,7 @@ private:
int mKeepAliveTime;
int mMaximumDiffingTime;
ProgressNotifier &mrProgressNotifier;
+ bool mTcpNiceMode;
};
#endif // BACKUPCLIENTCONTEXT__H
diff --git a/bin/bbackupd/BackupClientDirectoryRecord.cpp b/bin/bbackupd/BackupClientDirectoryRecord.cpp
index 86c9688f..3a0ed08b 100644
--- a/bin/bbackupd/BackupClientDirectoryRecord.cpp
+++ b/bin/bbackupd/BackupClientDirectoryRecord.cpp
@@ -1671,6 +1671,7 @@ int64_t BackupClientDirectoryRecord::UploadFile(
&isCompletelyDifferent));
rContext.UnManageDiffProcess();
+ rContext.SetNiceMode(true);
RateLimitingStream rateLimit(*patchStream,
rParams.mMaxUploadRate);
@@ -1690,6 +1691,8 @@ int64_t BackupClientDirectoryRecord::UploadFile(
//
std::auto_ptr<BackupProtocolSuccess> stored(connection.QueryStoreFile(mObjectID, ModificationTime,
AttributesHash, isCompletelyDifferent?(0):(diffFromID), rStoreFilename, *pStreamToUpload));
+
+ rContext.SetNiceMode(false);
// Get object ID from the result
objID = stored->GetObjectID();
@@ -1715,6 +1718,8 @@ int64_t BackupClientDirectoryRecord::UploadFile(
&rParams,
&(rParams.mrRunStatusProvider)));
+ rContext.SetNiceMode(true);
+
RateLimitingStream rateLimit(*upload,
rParams.mMaxUploadRate);
IOStream* pStreamToUpload;
@@ -1735,6 +1740,8 @@ int64_t BackupClientDirectoryRecord::UploadFile(
AttributesHash,
0 /* no diff from file ID */,
rStoreFilename, *pStreamToUpload));
+
+ rContext.SetNiceMode(false);
// Get object ID from the result
objID = stored->GetObjectID();
diff --git a/bin/bbackupd/BackupDaemon.cpp b/bin/bbackupd/BackupDaemon.cpp
index 3d0c9533..bca045d4 100644
--- a/bin/bbackupd/BackupDaemon.cpp
+++ b/bin/bbackupd/BackupDaemon.cpp
@@ -844,7 +844,9 @@ void BackupDaemon::RunSyncNow()
conf.GetKeyValueUint32("AccountNumber"),
conf.GetKeyValueBool("ExtendedLogging"),
conf.KeyExists("ExtendedLogFile"),
- extendedLogFile, *mpProgressNotifier
+ extendedLogFile,
+ *mpProgressNotifier,
+ conf.GetKeyValueBool("TcpNice")
);
// The minimum age a file needs to be before it will be
diff --git a/infrastructure/m4/boxbackup_tests.m4 b/infrastructure/m4/boxbackup_tests.m4
index 98ab7069..dce061af 100644
--- a/infrastructure/m4/boxbackup_tests.m4
+++ b/infrastructure/m4/boxbackup_tests.m4
@@ -131,7 +131,7 @@ AC_HEADER_STDC
AC_HEADER_SYS_WAIT
AC_CHECK_HEADERS([dlfcn.h fcntl.h getopt.h process.h pwd.h signal.h])
AC_CHECK_HEADERS([syslog.h time.h cxxabi.h])
-AC_CHECK_HEADERS([netinet/in.h])
+AC_CHECK_HEADERS([netinet/in.h netinet/tcp.h])
AC_CHECK_HEADERS([sys/file.h sys/param.h sys/socket.h sys/time.h sys/types.h sys/wait.h])
AC_CHECK_HEADERS([sys/uio.h sys/xattr.h])
AC_CHECK_HEADERS([bsd/unistd.h])
@@ -193,10 +193,14 @@ AC_CHECK_MEMBERS([struct sockaddr_in.sin_len],,, [[
]])
AC_CHECK_MEMBERS([DIR.d_fd],,, [[#include <dirent.h>]])
AC_CHECK_MEMBERS([DIR.dd_fd],,, [[#include <dirent.h>]])
+AC_CHECK_MEMBERS([struct tcp_info.tcpi_rtt],,, [[#include <netinet/tcp.h>]])
AC_CHECK_DECLS([INFTIM],,, [[#include <poll.h>]])
AC_CHECK_DECLS([SO_PEERCRED],,, [[#include <sys/socket.h>]])
+AC_CHECK_DECLS([SO_SNDBUF],,, [[#include <asm/socket.h>]])
AC_CHECK_DECLS([O_BINARY],,,)
+AC_CHECK_DECLS([SOL_TCP],,, [[#include <netinet/tcp.h>]])
+AC_CHECK_DECLS([TCP_INFO],,, [[#include <netinet/tcp.h>]])
# Solaris provides getpeerucred() instead of getpeereid() or SO_PEERCRED
AC_CHECK_HEADERS([ucred.h])
diff --git a/lib/backupclient/BackupDaemonConfigVerify.cpp b/lib/backupclient/BackupDaemonConfigVerify.cpp
index cdd71425..a3b95335 100644
--- a/lib/backupclient/BackupDaemonConfigVerify.cpp
+++ b/lib/backupclient/BackupDaemonConfigVerify.cpp
@@ -117,6 +117,9 @@ static const ConfigurationVerifyKey verifyrootkeys[] =
ConfigurationVerifyKey("MaxUploadRate", ConfigTest_IsInt),
// optional maximum speed of uploads in kbytes per second
+ ConfigurationVerifyKey("TcpNice", ConfigTest_IsBool, false),
+ // optional enable of tcp nice/background mode
+
ConfigurationVerifyKey("CertificateFile", ConfigTest_Exists),
ConfigurationVerifyKey("PrivateKeyFile", ConfigTest_Exists),
ConfigurationVerifyKey("TrustedCAsFile", ConfigTest_Exists),
diff --git a/lib/backupstore/BackupStoreFileDiff.cpp b/lib/backupstore/BackupStoreFileDiff.cpp
index 5705c3aa..b39097db 100644
--- a/lib/backupstore/BackupStoreFileDiff.cpp
+++ b/lib/backupstore/BackupStoreFileDiff.cpp
@@ -469,7 +469,7 @@ static void SearchForMatchingBlocks(IOStream &rFile, std::map<int64_t, int64_t>
if(pDiffTimer && pDiffTimer->IsManaged())
{
- maximumDiffingTime = Timer(pDiffTimer->GetMaximumDiffingTime(),
+ maximumDiffingTime = Timer(pDiffTimer->GetMaximumDiffingTime() * 1000,
"MaximumDiffingTime");
}
diff --git a/lib/common/BoxTime.h b/lib/common/BoxTime.h
index cc1571cd..dfc40263 100644
--- a/lib/common/BoxTime.h
+++ b/lib/common/BoxTime.h
@@ -27,6 +27,10 @@ inline box_time_t SecondsToBoxTime(time_t Seconds)
{
return ((box_time_t)Seconds * MICRO_SEC_IN_SEC_LL);
}
+inline uint64_t MilliSecondsToBoxTime(int64_t milliseconds)
+{
+ return ((box_time_t)milliseconds * 1000);
+}
inline time_t BoxTimeToSeconds(box_time_t Time)
{
return Time / MICRO_SEC_IN_SEC_LL;
diff --git a/lib/common/Timer.cpp b/lib/common/Timer.cpp
index c0451818..81a4dd80 100644
--- a/lib/common/Timer.cpp
+++ b/lib/common/Timer.cpp
@@ -335,7 +335,7 @@ void Timers::SignalHandler(int unused)
// --------------------------------------------------------------------------
//
// Function
-// Name: Timer::Timer(size_t timeoutSecs,
+// Name: Timer::Timer(size_t timeoutMillis,
// const std::string& rName)
// Purpose: Standard timer constructor, takes a timeout in
// seconds from now, and an optional name for
@@ -344,8 +344,8 @@ void Timers::SignalHandler(int unused)
//
// --------------------------------------------------------------------------
-Timer::Timer(size_t timeoutSecs, const std::string& rName)
-: mExpires(GetCurrentBoxTime() + SecondsToBoxTime(timeoutSecs)),
+Timer::Timer(size_t timeoutMillis, const std::string& rName)
+: mExpires(GetCurrentBoxTime() + MilliSecondsToBoxTime(timeoutMillis)),
mExpired(false),
mName(rName)
#ifdef WIN32
@@ -353,26 +353,26 @@ Timer::Timer(size_t timeoutSecs, const std::string& rName)
#endif
{
#ifndef BOX_RELEASE_BUILD
- if (timeoutSecs == 0)
+ if (timeoutMillis == 0)
{
- BOX_TRACE(TIMER_ID "initialised for " << timeoutSecs <<
- " secs, will not fire");
+ BOX_TRACE(TIMER_ID "initialised for " << timeoutMillis <<
+ " ms, will not fire");
}
else
{
- BOX_TRACE(TIMER_ID "initialised for " << timeoutSecs <<
- " secs, to fire at " << FormatTime(mExpires, false, true));
+ BOX_TRACE(TIMER_ID "initialised for " << timeoutMillis <<
+ " ms, to fire at " << FormatTime(mExpires, false, true));
}
#endif
- if (timeoutSecs == 0)
+ if (timeoutMillis == 0)
{
mExpires = 0;
}
else
{
Timers::Add(*this);
- Start(timeoutSecs * MICRO_SEC_IN_SEC_LL);
+ Start(timeoutMillis * 1000);
}
}
@@ -408,7 +408,7 @@ void Timer::Start()
// --------------------------------------------------------------------------
//
// Function
-// Name: Timer::Start(int64_t delayInMicros)
+// Name: Timer::Start(int64_t timeoutMillis)
// Purpose: This internal function initialises an OS TimerQueue
// timer on Windows, with a specified delay already
// calculated to save us doing it again. Like
@@ -417,23 +417,21 @@ void Timer::Start()
//
// --------------------------------------------------------------------------
-void Timer::Start(int64_t delayInMicros)
+void Timer::Start(int64_t timeoutMillis)
{
#ifdef WIN32
// only call me once!
ASSERT(mTimerHandle == INVALID_HANDLE_VALUE);
- int64_t delayInMillis = delayInMicros / 1000;
-
// Windows XP always seems to fire timers up to 20 ms late,
// at least on my test laptop. Not critical in practice, but our
// tests are precise enough that they will fail if we don't
// correct for it.
- delayInMillis -= 20;
+ timeoutMillis -= 20;
// Set a system timer to call our timer routine
if (CreateTimerQueueTimer(&mTimerHandle, NULL, TimerRoutine,
- (PVOID)this, delayInMillis, 0, WT_EXECUTEINTIMERTHREAD)
+ (PVOID)this, timeoutMillis, 0, WT_EXECUTEINTIMERTHREAD)
== FALSE)
{
BOX_ERROR(TIMER_ID "failed to create timer: " <<
@@ -523,8 +521,8 @@ Timer::Timer(const Timer& rToCopy)
{
BOX_TRACE(TIMER_ID "initialised from timer " << &rToCopy << ", "
"to fire at " <<
- (int)(mExpires / 1000000) << "." <<
- (int)(mExpires % 1000000));
+ (int)(mExpires / MICRO_SEC_IN_SEC_LL) << "." <<
+ (int)(mExpires % MICRO_SEC_IN_SEC_LL));
}
#endif
@@ -564,8 +562,8 @@ Timer& Timer::operator=(const Timer& rToCopy)
{
BOX_TRACE(TIMER_ID "initialised from timer " << &rToCopy << ", "
"to fire at " <<
- (int)(rToCopy.mExpires / 1000000) << "." <<
- (int)(rToCopy.mExpires % 1000000));
+ (int)(rToCopy.mExpires / MICRO_SEC_IN_SEC_LL) << "." <<
+ (int)(rToCopy.mExpires % MICRO_SEC_IN_SEC_LL));
}
#endif
diff --git a/lib/common/Timer.h b/lib/common/Timer.h
index 42b2e00f..bd118a18 100644
--- a/lib/common/Timer.h
+++ b/lib/common/Timer.h
@@ -53,7 +53,7 @@ class Timers
class Timer
{
public:
- Timer(size_t timeoutSecs, const std::string& rName = "");
+ Timer(size_t timeoutMillis, const std::string& rName = "");
virtual ~Timer();
Timer(const Timer &);
Timer &operator=(const Timer &);
@@ -74,7 +74,7 @@ private:
std::string mName;
void Start();
- void Start(int64_t delayInMicros);
+ void Start(int64_t timeoutMillis);
void Stop();
#ifdef WIN32
diff --git a/lib/server/SocketStream.h b/lib/server/SocketStream.h
index 2b582f21..2fb5e391 100644
--- a/lib/server/SocketStream.h
+++ b/lib/server/SocketStream.h
@@ -51,7 +51,6 @@ public:
virtual bool GetPeerCredentials(uid_t &rUidOut, gid_t &rGidOut);
protected:
- tOSSocketHandle GetSocketHandle();
void MarkAsReadClosed() {mReadClosed = true;}
void MarkAsWriteClosed() {mWriteClosed = true;}
@@ -69,6 +68,11 @@ public:
off_t GetBytesWritten() const {return mBytesWritten;}
void ResetCounters() {mBytesRead = mBytesWritten = 0;}
bool IsOpened() { return mSocketHandle != INVALID_SOCKET_VALUE; }
+
+ /**
+ * Only for use by NiceSocketStream!
+ */
+ tOSSocketHandle GetSocketHandle();
};
#endif // SOCKETSTREAM__H
diff --git a/lib/server/TcpNice.cpp b/lib/server/TcpNice.cpp
new file mode 100644
index 00000000..88d08bda
--- /dev/null
+++ b/lib/server/TcpNice.cpp
@@ -0,0 +1,204 @@
+// --------------------------------------------------------------------------
+//
+// File
+// Name: TcpNice.cpp
+// Purpose: Calculator for adaptive TCP window sizing to support
+// low-priority background flows using the stochastic
+// algorithm, as described in
+// http://www.thlab.net/~lmassoul/p18-key.pdf
+// Created: 11/02/2012
+//
+// --------------------------------------------------------------------------
+
+#include "Box.h"
+
+#include "TcpNice.h"
+#include "Logging.h"
+#include "BoxTime.h"
+
+#ifdef HAVE_NETINET_TCP_H
+# include <netinet/tcp.h>
+#endif
+
+#include "MemLeakFindOn.h"
+
+// --------------------------------------------------------------------------
+//
+// Function
+// Name: TcpNice::TcpNice()
+// Purpose: Initialise state of the calculator
+// Created: 11/02/2012
+//
+// --------------------------------------------------------------------------
+TcpNice::TcpNice()
+: mLastWindowSize(1),
+ mGammaPercent(100),
+ mAlphaStar(100),
+ mDeltaPercent(10)
+{ }
+
+// --------------------------------------------------------------------------
+//
+// Function
+// Name: int GetNextWindowSize(int bytesChange,
+// box_time_t timeElapsed, int rttEstimateMillis)
+// Purpose: Calculate the next recommended window size, given the
+// number of bytes sent since the previous recommendation,
+// and the time elapsed.
+// Created: 11/02/2012
+//
+// --------------------------------------------------------------------------
+int TcpNice::GetNextWindowSize(int bytesChange, box_time_t timeElapsed,
+ int rttEstimateMicros)
+{
+ int epsilon = (mAlphaStar * 1000000) / rttEstimateMicros;
+
+ // timeElapsed is in microseconds, so this will fail for T > 2000 seconds
+ int rateLastPeriod = ((uint64_t)bytesChange * 1000000 / timeElapsed);
+
+ int rawAdjustment = epsilon + rateLastPeriod -
+ mRateEstimateMovingAverage[0];
+
+ int gammaAdjustment = (rawAdjustment * mGammaPercent) / 100;
+
+ int newWindowSize = mLastWindowSize + gammaAdjustment;
+
+ int newRateEstimateMovingAverage =
+ (((100 - mDeltaPercent) * mRateEstimateMovingAverage[1]) / 100) +
+ ((mDeltaPercent * rateLastPeriod) / 100);
+
+ BOX_TRACE("TcpNice: "
+ "b=" << bytesChange << ", "
+ "T=" << timeElapsed << ", "
+ "rtt=" << rttEstimateMicros << ", "
+ "e=" << epsilon << ", "
+ "rb=" << rateLastPeriod << ", "
+ "rbhat=" << newRateEstimateMovingAverage << ", "
+ "raw=" << rawAdjustment << ", "
+ "gamma=" << gammaAdjustment << ", "
+ "wb=" << newWindowSize);
+
+ mRateEstimateMovingAverage[0] = mRateEstimateMovingAverage[1];
+ mRateEstimateMovingAverage[1] = newRateEstimateMovingAverage;
+ mLastWindowSize = newWindowSize;
+
+ return newWindowSize;
+}
+
+// --------------------------------------------------------------------------
+//
+// Constructor
+// Name: NiceSocketStream::NiceSocketStream(
+// std::auto_ptr<SocketStream> apSocket)
+// Purpose: Initialise state of the socket wrapper
+// Created: 11/02/2012
+//
+// --------------------------------------------------------------------------
+
+NiceSocketStream::NiceSocketStream(std::auto_ptr<SocketStream> apSocket)
+: mapSocket(apSocket),
+ mTcpNice(),
+ mBytesWrittenThisPeriod(0),
+ mPeriodStartTime(GetCurrentBoxTime()),
+ mTimeIntervalMillis(1000),
+ mEnabled(false)
+{ }
+
+// --------------------------------------------------------------------------
+//
+// Function
+// Name: NiceSocketStream::Write(const void *pBuffer, int NBytes)
+// Purpose: Writes bytes to the underlying stream, adjusting window size
+// using a TcpNice calculator.
+// Created: 2012/02/11
+//
+// --------------------------------------------------------------------------
+void NiceSocketStream::Write(const void *pBuffer, int NBytes)
+{
+#ifdef HAVE_DECL_SO_SNDBUF
+ if(mEnabled && mapTimer.get() && mapTimer->HasExpired())
+ {
+ box_time_t newPeriodStart = GetCurrentBoxTime();
+ box_time_t elapsed = newPeriodStart - mPeriodStartTime;
+ struct tcp_info info;
+ int socket = mapSocket->GetSocketHandle();
+ int rtt = 50; // WAG
+
+# if defined HAVE_DECL_SOL_TCP && defined HAVE_DECL_TCP_INFO && defined HAVE_STRUCT_TCP_INFO_TCPI_RTT
+ socklen_t optlen = sizeof(info);
+ if(getsockopt(socket, SOL_TCP, TCP_INFO, &info, &optlen) == -1)
+ {
+ BOX_LOG_SYS_WARNING("getsockopt(" << socket << ", SOL_TCP, "
+ "TCP_INFO) failed");
+ }
+ else if(optlen != sizeof(info))
+ {
+ BOX_WARNING("getsockopt(" << socket << ", SOL_TCP, "
+ "TCP_INFO) return structure size " << optlen << ", "
+ "expected " << sizeof(info));
+ }
+ else
+ {
+ rtt = info.tcpi_rtt;
+ }
+# endif
+
+ int newWindow = mTcpNice.GetNextWindowSize(mBytesWrittenThisPeriod,
+ elapsed, rtt);
+
+ if(setsockopt(socket, SOL_SOCKET, SO_SNDBUF, &newWindow,
+ sizeof(newWindow)) == -1)
+ {
+ BOX_LOG_SYS_WARNING("getsockopt(" << socket << ", SOL_SOCKET, "
+ "SO_SNDBUF, " << newWindow << ") failed");
+ }
+
+ StopTimer();
+ }
+
+ if(mEnabled && !mapTimer.get())
+ {
+ // Don't start the timer until we receive the first data to write,
+ // as diffing might take a long time and we don't want to bias
+ // the TcpNice algorithm by running while we don't have bulk data
+ // to send.
+ StartTimer();
+ mPeriodStartTime = GetCurrentBoxTime();
+ mBytesWrittenThisPeriod = 0;
+ }
+
+ mBytesWrittenThisPeriod += NBytes;
+#endif // HAVE_DECL_SO_SNDBUF
+
+ mapSocket->Write(pBuffer, NBytes);
+}
+
+// --------------------------------------------------------------------------
+//
+// Function
+// Name: NiceSocketStream::SetEnabled(bool enabled)
+// Purpose: Update the enabled status, and if disabling, cancel the
+// timer and set a sensible window size.
+// Created: 2012/02/12
+//
+// --------------------------------------------------------------------------
+
+void NiceSocketStream::SetEnabled(bool enabled)
+{
+ mEnabled = enabled;
+
+ if(!enabled)
+ {
+ StopTimer();
+#ifdef HAVE_DECL_SO_SNDBUF
+ int socket = mapSocket->GetSocketHandle();
+ int newWindow = 1<<17;
+ if(setsockopt(socket, SOL_SOCKET, SO_SNDBUF, &newWindow,
+ sizeof(newWindow)) == -1)
+ {
+ BOX_LOG_SYS_WARNING("getsockopt(" << socket << ", SOL_SOCKET, "
+ "SO_SNDBUF, " << newWindow << ") failed");
+ }
+#endif
+ }
+}
diff --git a/lib/server/TcpNice.h b/lib/server/TcpNice.h
new file mode 100644
index 00000000..e2027749
--- /dev/null
+++ b/lib/server/TcpNice.h
@@ -0,0 +1,174 @@
+// --------------------------------------------------------------------------
+//
+// File
+// Name: TcpNice.h
+// Purpose: Calculator for adaptive TCP window sizing to support
+// low-priority background flows using the stochastic
+// algorithm, as described in
+// http://www.thlab.net/~lmassoul/p18-key.pdf
+// Created: 11/02/2012
+//
+// --------------------------------------------------------------------------
+
+#ifndef TCPNICE__H
+#define TCPNICE__H
+
+#include <memory>
+
+#include "SocketStream.h"
+#include "Timer.h"
+
+// --------------------------------------------------------------------------
+//
+// Class
+// Name: TcpNice
+// Purpose: Calculator for adaptive TCP window sizing.
+// Created: 11/02/2012
+//
+// --------------------------------------------------------------------------
+
+class TcpNice
+{
+public:
+ TcpNice();
+ int GetNextWindowSize(int bytesChange, box_time_t timeElapsed,
+ int rttEstimateMicros);
+
+private:
+ /**
+ * The previous (last recommended) window size is one of the parameters
+ * used to calculate the next window size.
+ */
+ int mLastWindowSize;
+
+ /**
+ * Controls the speed of adaptation and the variance (random variation)
+ * of the stable state in response to noise. The paper suggests using
+ * 1.0 (100%).
+ */
+ int mGammaPercent;
+
+ /**
+ * Controls the extent to which background flows are allowed to affect
+ * foreground flows. Its detailed meaning is not explained in the paper,
+ * but its units are bytes, and I think it controls how aggressive we
+ * are at increasing window size, potentially at the expense of other
+ * competing flows.
+ */
+ int mAlphaStar;
+
+ /**
+ * Controls the speed of adaptation of the exponential weighted moving
+ * average (EWMA) estimate of the bandwidth available to this flow.
+ * The paper uses 10%.
+ */
+ int mDeltaPercent;
+
+ /**
+ * The stochastic algorithm in the paper uses the rate estimate for the
+ * last-but-one period (rbHat[n-2]) to calculate the next window size.
+ * So we keep both the last (in rateEstimateMovingAverage[1]) and the
+ * last-but-one (in rateEstimateMovingAverage[0]) values.
+ */
+ int mRateEstimateMovingAverage[2];
+};
+
+// --------------------------------------------------------------------------
+//
+// Class
+// Name: NiceSocketStream
+// Purpose: Wrapper around a SocketStream to limit sending rate to
+// avoid interference with higher-priority flows.
+// Created: 11/02/2012
+//
+// --------------------------------------------------------------------------
+
+class NiceSocketStream : public IOStream
+{
+private:
+ std::auto_ptr<SocketStream> mapSocket;
+ TcpNice mTcpNice;
+ std::auto_ptr<Timer> mapTimer;
+ int mBytesWrittenThisPeriod;
+ box_time_t mPeriodStartTime;
+
+ /**
+ * The control interval T from the paper, in milliseconds. The available
+ * bandwidth is estimated over this period, and the window size is
+ * recalculated at the end of each period. It should be long enough for
+ * TCP to adapt to a change in window size; perhaps 10-100 RTTs. One
+ * second (1000) is probably a good first approximation in many cases.
+ */
+ int mTimeIntervalMillis;
+
+ /**
+ * Because our data use is bursty, and tcp nice works on the assumption
+ * that we've always got data to send, we should only enable nice mode
+ * when we're doing a bulk upload, and disable it afterwards.
+ */
+ bool mEnabled;
+
+ void StartTimer()
+ {
+ mapTimer.reset(new Timer(mTimeIntervalMillis, "NiceSocketStream"));
+ }
+
+ void StopTimer()
+ {
+ mapTimer.reset();
+ }
+
+public:
+ NiceSocketStream(std::auto_ptr<SocketStream> apSocket);
+ virtual ~NiceSocketStream()
+ {
+ // Be nice about closing the socket
+ mapSocket->Shutdown();
+ mapSocket->Close();
+ }
+
+ // This is the only magic
+ virtual void Write(const void *pBuffer, int NBytes);
+
+ // Everything else is delegated to the sink
+ virtual int Read(void *pBuffer, int NBytes,
+ int Timeout = IOStream::TimeOutInfinite)
+ {
+ return mapSocket->Read(pBuffer, NBytes, Timeout);
+ }
+ virtual pos_type BytesLeftToRead()
+ {
+ return mapSocket->BytesLeftToRead();
+ }
+ virtual pos_type GetPosition() const
+ {
+ return mapSocket->GetPosition();
+ }
+ virtual void Seek(IOStream::pos_type Offset, int SeekType)
+ {
+ mapSocket->Seek(Offset, SeekType);
+ }
+ virtual void Flush(int Timeout = IOStream::TimeOutInfinite)
+ {
+ mapSocket->Flush(Timeout);
+ }
+ virtual void Close()
+ {
+ mapSocket->Close();
+ }
+ virtual bool StreamDataLeft()
+ {
+ return mapSocket->StreamDataLeft();
+ }
+ virtual bool StreamClosed()
+ {
+ return mapSocket->StreamClosed();
+ }
+ virtual void SetEnabled(bool enabled);
+
+private:
+ NiceSocketStream(const NiceSocketStream &rToCopy)
+ { /* do not call */ }
+};
+
+#endif // TCPNICE__H