summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorChris Wilson <chris+github@qwirx.com>2012-02-12 12:29:56 +0000
committerChris Wilson <chris+github@qwirx.com>2012-02-12 12:29:56 +0000
commit15a3e05144785bfcaddcf2d11b6c549edd9f62ee (patch)
tree30827463c74a3b24f4b206894580a4511203c014 /lib
parent03d7cb4d75822b40d263d0737e3115a0aeff1cda (diff)
Add experimental "TCP Nice" mode, disabled by default.
Diffstat (limited to 'lib')
-rw-r--r--lib/backupclient/BackupDaemonConfigVerify.cpp3
-rw-r--r--lib/backupstore/BackupStoreFileDiff.cpp2
-rw-r--r--lib/common/BoxTime.h4
-rw-r--r--lib/common/Timer.cpp38
-rw-r--r--lib/common/Timer.h4
-rw-r--r--lib/server/SocketStream.h6
-rw-r--r--lib/server/TcpNice.cpp204
-rw-r--r--lib/server/TcpNice.h174
8 files changed, 411 insertions, 24 deletions
diff --git a/lib/backupclient/BackupDaemonConfigVerify.cpp b/lib/backupclient/BackupDaemonConfigVerify.cpp
index cdd71425..a3b95335 100644
--- a/lib/backupclient/BackupDaemonConfigVerify.cpp
+++ b/lib/backupclient/BackupDaemonConfigVerify.cpp
@@ -117,6 +117,9 @@ static const ConfigurationVerifyKey verifyrootkeys[] =
ConfigurationVerifyKey("MaxUploadRate", ConfigTest_IsInt),
// optional maximum speed of uploads in kbytes per second
+ ConfigurationVerifyKey("TcpNice", ConfigTest_IsBool, false),
+ // optional enable of tcp nice/background mode
+
ConfigurationVerifyKey("CertificateFile", ConfigTest_Exists),
ConfigurationVerifyKey("PrivateKeyFile", ConfigTest_Exists),
ConfigurationVerifyKey("TrustedCAsFile", ConfigTest_Exists),
diff --git a/lib/backupstore/BackupStoreFileDiff.cpp b/lib/backupstore/BackupStoreFileDiff.cpp
index 5705c3aa..b39097db 100644
--- a/lib/backupstore/BackupStoreFileDiff.cpp
+++ b/lib/backupstore/BackupStoreFileDiff.cpp
@@ -469,7 +469,7 @@ static void SearchForMatchingBlocks(IOStream &rFile, std::map<int64_t, int64_t>
if(pDiffTimer && pDiffTimer->IsManaged())
{
- maximumDiffingTime = Timer(pDiffTimer->GetMaximumDiffingTime(),
+ maximumDiffingTime = Timer(pDiffTimer->GetMaximumDiffingTime() * 1000,
"MaximumDiffingTime");
}
diff --git a/lib/common/BoxTime.h b/lib/common/BoxTime.h
index cc1571cd..dfc40263 100644
--- a/lib/common/BoxTime.h
+++ b/lib/common/BoxTime.h
@@ -27,6 +27,10 @@ inline box_time_t SecondsToBoxTime(time_t Seconds)
{
return ((box_time_t)Seconds * MICRO_SEC_IN_SEC_LL);
}
+inline uint64_t MilliSecondsToBoxTime(int64_t milliseconds)
+{
+ return ((box_time_t)milliseconds * 1000);
+}
inline time_t BoxTimeToSeconds(box_time_t Time)
{
return Time / MICRO_SEC_IN_SEC_LL;
diff --git a/lib/common/Timer.cpp b/lib/common/Timer.cpp
index c0451818..81a4dd80 100644
--- a/lib/common/Timer.cpp
+++ b/lib/common/Timer.cpp
@@ -335,7 +335,7 @@ void Timers::SignalHandler(int unused)
// --------------------------------------------------------------------------
//
// Function
-// Name: Timer::Timer(size_t timeoutSecs,
+// Name: Timer::Timer(size_t timeoutMillis,
// const std::string& rName)
// Purpose: Standard timer constructor, takes a timeout in
// seconds from now, and an optional name for
@@ -344,8 +344,8 @@ void Timers::SignalHandler(int unused)
//
// --------------------------------------------------------------------------
-Timer::Timer(size_t timeoutSecs, const std::string& rName)
-: mExpires(GetCurrentBoxTime() + SecondsToBoxTime(timeoutSecs)),
+Timer::Timer(size_t timeoutMillis, const std::string& rName)
+: mExpires(GetCurrentBoxTime() + MilliSecondsToBoxTime(timeoutMillis)),
mExpired(false),
mName(rName)
#ifdef WIN32
@@ -353,26 +353,26 @@ Timer::Timer(size_t timeoutSecs, const std::string& rName)
#endif
{
#ifndef BOX_RELEASE_BUILD
- if (timeoutSecs == 0)
+ if (timeoutMillis == 0)
{
- BOX_TRACE(TIMER_ID "initialised for " << timeoutSecs <<
- " secs, will not fire");
+ BOX_TRACE(TIMER_ID "initialised for " << timeoutMillis <<
+ " ms, will not fire");
}
else
{
- BOX_TRACE(TIMER_ID "initialised for " << timeoutSecs <<
- " secs, to fire at " << FormatTime(mExpires, false, true));
+ BOX_TRACE(TIMER_ID "initialised for " << timeoutMillis <<
+ " ms, to fire at " << FormatTime(mExpires, false, true));
}
#endif
- if (timeoutSecs == 0)
+ if (timeoutMillis == 0)
{
mExpires = 0;
}
else
{
Timers::Add(*this);
- Start(timeoutSecs * MICRO_SEC_IN_SEC_LL);
+ Start(timeoutMillis * 1000);
}
}
@@ -408,7 +408,7 @@ void Timer::Start()
// --------------------------------------------------------------------------
//
// Function
-// Name: Timer::Start(int64_t delayInMicros)
+// Name: Timer::Start(int64_t timeoutMillis)
// Purpose: This internal function initialises an OS TimerQueue
// timer on Windows, with a specified delay already
// calculated to save us doing it again. Like
@@ -417,23 +417,21 @@ void Timer::Start()
//
// --------------------------------------------------------------------------
-void Timer::Start(int64_t delayInMicros)
+void Timer::Start(int64_t timeoutMillis)
{
#ifdef WIN32
// only call me once!
ASSERT(mTimerHandle == INVALID_HANDLE_VALUE);
- int64_t delayInMillis = delayInMicros / 1000;
-
// Windows XP always seems to fire timers up to 20 ms late,
// at least on my test laptop. Not critical in practice, but our
// tests are precise enough that they will fail if we don't
// correct for it.
- delayInMillis -= 20;
+ timeoutMillis -= 20;
// Set a system timer to call our timer routine
if (CreateTimerQueueTimer(&mTimerHandle, NULL, TimerRoutine,
- (PVOID)this, delayInMillis, 0, WT_EXECUTEINTIMERTHREAD)
+ (PVOID)this, timeoutMillis, 0, WT_EXECUTEINTIMERTHREAD)
== FALSE)
{
BOX_ERROR(TIMER_ID "failed to create timer: " <<
@@ -523,8 +521,8 @@ Timer::Timer(const Timer& rToCopy)
{
BOX_TRACE(TIMER_ID "initialised from timer " << &rToCopy << ", "
"to fire at " <<
- (int)(mExpires / 1000000) << "." <<
- (int)(mExpires % 1000000));
+ (int)(mExpires / MICRO_SEC_IN_SEC_LL) << "." <<
+ (int)(mExpires % MICRO_SEC_IN_SEC_LL));
}
#endif
@@ -564,8 +562,8 @@ Timer& Timer::operator=(const Timer& rToCopy)
{
BOX_TRACE(TIMER_ID "initialised from timer " << &rToCopy << ", "
"to fire at " <<
- (int)(rToCopy.mExpires / 1000000) << "." <<
- (int)(rToCopy.mExpires % 1000000));
+ (int)(rToCopy.mExpires / MICRO_SEC_IN_SEC_LL) << "." <<
+ (int)(rToCopy.mExpires % MICRO_SEC_IN_SEC_LL));
}
#endif
diff --git a/lib/common/Timer.h b/lib/common/Timer.h
index 42b2e00f..bd118a18 100644
--- a/lib/common/Timer.h
+++ b/lib/common/Timer.h
@@ -53,7 +53,7 @@ class Timers
class Timer
{
public:
- Timer(size_t timeoutSecs, const std::string& rName = "");
+ Timer(size_t timeoutMillis, const std::string& rName = "");
virtual ~Timer();
Timer(const Timer &);
Timer &operator=(const Timer &);
@@ -74,7 +74,7 @@ private:
std::string mName;
void Start();
- void Start(int64_t delayInMicros);
+ void Start(int64_t timeoutMillis);
void Stop();
#ifdef WIN32
diff --git a/lib/server/SocketStream.h b/lib/server/SocketStream.h
index 2b582f21..2fb5e391 100644
--- a/lib/server/SocketStream.h
+++ b/lib/server/SocketStream.h
@@ -51,7 +51,6 @@ public:
virtual bool GetPeerCredentials(uid_t &rUidOut, gid_t &rGidOut);
protected:
- tOSSocketHandle GetSocketHandle();
void MarkAsReadClosed() {mReadClosed = true;}
void MarkAsWriteClosed() {mWriteClosed = true;}
@@ -69,6 +68,11 @@ public:
off_t GetBytesWritten() const {return mBytesWritten;}
void ResetCounters() {mBytesRead = mBytesWritten = 0;}
bool IsOpened() { return mSocketHandle != INVALID_SOCKET_VALUE; }
+
+ /**
+ * Only for use by NiceSocketStream!
+ */
+ tOSSocketHandle GetSocketHandle();
};
#endif // SOCKETSTREAM__H
diff --git a/lib/server/TcpNice.cpp b/lib/server/TcpNice.cpp
new file mode 100644
index 00000000..88d08bda
--- /dev/null
+++ b/lib/server/TcpNice.cpp
@@ -0,0 +1,204 @@
+// --------------------------------------------------------------------------
+//
+// File
+// Name: TcpNice.cpp
+// Purpose: Calculator for adaptive TCP window sizing to support
+// low-priority background flows using the stochastic
+// algorithm, as described in
+// http://www.thlab.net/~lmassoul/p18-key.pdf
+// Created: 11/02/2012
+//
+// --------------------------------------------------------------------------
+
+#include "Box.h"
+
+#include "TcpNice.h"
+#include "Logging.h"
+#include "BoxTime.h"
+
+#ifdef HAVE_NETINET_TCP_H
+# include <netinet/tcp.h>
+#endif
+
+#include "MemLeakFindOn.h"
+
+// --------------------------------------------------------------------------
+//
+// Function
+// Name: TcpNice::TcpNice()
+// Purpose: Initialise state of the calculator
+// Created: 11/02/2012
+//
+// --------------------------------------------------------------------------
+TcpNice::TcpNice()
+: mLastWindowSize(1),
+ mGammaPercent(100),
+ mAlphaStar(100),
+ mDeltaPercent(10)
+{ }
+
+// --------------------------------------------------------------------------
+//
+// Function
+// Name: int GetNextWindowSize(int bytesChange,
+// box_time_t timeElapsed, int rttEstimateMillis)
+// Purpose: Calculate the next recommended window size, given the
+// number of bytes sent since the previous recommendation,
+// and the time elapsed.
+// Created: 11/02/2012
+//
+// --------------------------------------------------------------------------
+int TcpNice::GetNextWindowSize(int bytesChange, box_time_t timeElapsed,
+ int rttEstimateMicros)
+{
+ int epsilon = (mAlphaStar * 1000000) / rttEstimateMicros;
+
+ // timeElapsed is in microseconds, so this will fail for T > 2000 seconds
+ int rateLastPeriod = ((uint64_t)bytesChange * 1000000 / timeElapsed);
+
+ int rawAdjustment = epsilon + rateLastPeriod -
+ mRateEstimateMovingAverage[0];
+
+ int gammaAdjustment = (rawAdjustment * mGammaPercent) / 100;
+
+ int newWindowSize = mLastWindowSize + gammaAdjustment;
+
+ int newRateEstimateMovingAverage =
+ (((100 - mDeltaPercent) * mRateEstimateMovingAverage[1]) / 100) +
+ ((mDeltaPercent * rateLastPeriod) / 100);
+
+ BOX_TRACE("TcpNice: "
+ "b=" << bytesChange << ", "
+ "T=" << timeElapsed << ", "
+ "rtt=" << rttEstimateMicros << ", "
+ "e=" << epsilon << ", "
+ "rb=" << rateLastPeriod << ", "
+ "rbhat=" << newRateEstimateMovingAverage << ", "
+ "raw=" << rawAdjustment << ", "
+ "gamma=" << gammaAdjustment << ", "
+ "wb=" << newWindowSize);
+
+ mRateEstimateMovingAverage[0] = mRateEstimateMovingAverage[1];
+ mRateEstimateMovingAverage[1] = newRateEstimateMovingAverage;
+ mLastWindowSize = newWindowSize;
+
+ return newWindowSize;
+}
+
+// --------------------------------------------------------------------------
+//
+// Constructor
+// Name: NiceSocketStream::NiceSocketStream(
+// std::auto_ptr<SocketStream> apSocket)
+// Purpose: Initialise state of the socket wrapper
+// Created: 11/02/2012
+//
+// --------------------------------------------------------------------------
+
+NiceSocketStream::NiceSocketStream(std::auto_ptr<SocketStream> apSocket)
+: mapSocket(apSocket),
+ mTcpNice(),
+ mBytesWrittenThisPeriod(0),
+ mPeriodStartTime(GetCurrentBoxTime()),
+ mTimeIntervalMillis(1000),
+ mEnabled(false)
+{ }
+
+// --------------------------------------------------------------------------
+//
+// Function
+// Name: NiceSocketStream::Write(const void *pBuffer, int NBytes)
+// Purpose: Writes bytes to the underlying stream, adjusting window size
+// using a TcpNice calculator.
+// Created: 2012/02/11
+//
+// --------------------------------------------------------------------------
+void NiceSocketStream::Write(const void *pBuffer, int NBytes)
+{
+#ifdef HAVE_DECL_SO_SNDBUF
+ if(mEnabled && mapTimer.get() && mapTimer->HasExpired())
+ {
+ box_time_t newPeriodStart = GetCurrentBoxTime();
+ box_time_t elapsed = newPeriodStart - mPeriodStartTime;
+ struct tcp_info info;
+ int socket = mapSocket->GetSocketHandle();
+ int rtt = 50; // WAG
+
+# if defined HAVE_DECL_SOL_TCP && defined HAVE_DECL_TCP_INFO && defined HAVE_STRUCT_TCP_INFO_TCPI_RTT
+ socklen_t optlen = sizeof(info);
+ if(getsockopt(socket, SOL_TCP, TCP_INFO, &info, &optlen) == -1)
+ {
+ BOX_LOG_SYS_WARNING("getsockopt(" << socket << ", SOL_TCP, "
+ "TCP_INFO) failed");
+ }
+ else if(optlen != sizeof(info))
+ {
+ BOX_WARNING("getsockopt(" << socket << ", SOL_TCP, "
+ "TCP_INFO) return structure size " << optlen << ", "
+ "expected " << sizeof(info));
+ }
+ else
+ {
+ rtt = info.tcpi_rtt;
+ }
+# endif
+
+ int newWindow = mTcpNice.GetNextWindowSize(mBytesWrittenThisPeriod,
+ elapsed, rtt);
+
+ if(setsockopt(socket, SOL_SOCKET, SO_SNDBUF, &newWindow,
+ sizeof(newWindow)) == -1)
+ {
+ BOX_LOG_SYS_WARNING("getsockopt(" << socket << ", SOL_SOCKET, "
+ "SO_SNDBUF, " << newWindow << ") failed");
+ }
+
+ StopTimer();
+ }
+
+ if(mEnabled && !mapTimer.get())
+ {
+ // Don't start the timer until we receive the first data to write,
+ // as diffing might take a long time and we don't want to bias
+ // the TcpNice algorithm by running while we don't have bulk data
+ // to send.
+ StartTimer();
+ mPeriodStartTime = GetCurrentBoxTime();
+ mBytesWrittenThisPeriod = 0;
+ }
+
+ mBytesWrittenThisPeriod += NBytes;
+#endif // HAVE_DECL_SO_SNDBUF
+
+ mapSocket->Write(pBuffer, NBytes);
+}
+
+// --------------------------------------------------------------------------
+//
+// Function
+// Name: NiceSocketStream::SetEnabled(bool enabled)
+// Purpose: Update the enabled status, and if disabling, cancel the
+// timer and set a sensible window size.
+// Created: 2012/02/12
+//
+// --------------------------------------------------------------------------
+
+void NiceSocketStream::SetEnabled(bool enabled)
+{
+ mEnabled = enabled;
+
+ if(!enabled)
+ {
+ StopTimer();
+#ifdef HAVE_DECL_SO_SNDBUF
+ int socket = mapSocket->GetSocketHandle();
+ int newWindow = 1<<17;
+ if(setsockopt(socket, SOL_SOCKET, SO_SNDBUF, &newWindow,
+ sizeof(newWindow)) == -1)
+ {
+ BOX_LOG_SYS_WARNING("getsockopt(" << socket << ", SOL_SOCKET, "
+ "SO_SNDBUF, " << newWindow << ") failed");
+ }
+#endif
+ }
+}
diff --git a/lib/server/TcpNice.h b/lib/server/TcpNice.h
new file mode 100644
index 00000000..e2027749
--- /dev/null
+++ b/lib/server/TcpNice.h
@@ -0,0 +1,174 @@
+// --------------------------------------------------------------------------
+//
+// File
+// Name: TcpNice.h
+// Purpose: Calculator for adaptive TCP window sizing to support
+// low-priority background flows using the stochastic
+// algorithm, as described in
+// http://www.thlab.net/~lmassoul/p18-key.pdf
+// Created: 11/02/2012
+//
+// --------------------------------------------------------------------------
+
+#ifndef TCPNICE__H
+#define TCPNICE__H
+
+#include <memory>
+
+#include "SocketStream.h"
+#include "Timer.h"
+
+// --------------------------------------------------------------------------
+//
+// Class
+// Name: TcpNice
+// Purpose: Calculator for adaptive TCP window sizing.
+// Created: 11/02/2012
+//
+// --------------------------------------------------------------------------
+
+class TcpNice
+{
+public:
+ TcpNice();
+ int GetNextWindowSize(int bytesChange, box_time_t timeElapsed,
+ int rttEstimateMicros);
+
+private:
+ /**
+ * The previous (last recommended) window size is one of the parameters
+ * used to calculate the next window size.
+ */
+ int mLastWindowSize;
+
+ /**
+ * Controls the speed of adaptation and the variance (random variation)
+ * of the stable state in response to noise. The paper suggests using
+ * 1.0 (100%).
+ */
+ int mGammaPercent;
+
+ /**
+ * Controls the extent to which background flows are allowed to affect
+ * foreground flows. Its detailed meaning is not explained in the paper,
+ * but its units are bytes, and I think it controls how aggressive we
+ * are at increasing window size, potentially at the expense of other
+ * competing flows.
+ */
+ int mAlphaStar;
+
+ /**
+ * Controls the speed of adaptation of the exponential weighted moving
+ * average (EWMA) estimate of the bandwidth available to this flow.
+ * The paper uses 10%.
+ */
+ int mDeltaPercent;
+
+ /**
+ * The stochastic algorithm in the paper uses the rate estimate for the
+ * last-but-one period (rbHat[n-2]) to calculate the next window size.
+ * So we keep both the last (in rateEstimateMovingAverage[1]) and the
+ * last-but-one (in rateEstimateMovingAverage[0]) values.
+ */
+ int mRateEstimateMovingAverage[2];
+};
+
+// --------------------------------------------------------------------------
+//
+// Class
+// Name: NiceSocketStream
+// Purpose: Wrapper around a SocketStream to limit sending rate to
+// avoid interference with higher-priority flows.
+// Created: 11/02/2012
+//
+// --------------------------------------------------------------------------
+
+class NiceSocketStream : public IOStream
+{
+private:
+ std::auto_ptr<SocketStream> mapSocket;
+ TcpNice mTcpNice;
+ std::auto_ptr<Timer> mapTimer;
+ int mBytesWrittenThisPeriod;
+ box_time_t mPeriodStartTime;
+
+ /**
+ * The control interval T from the paper, in milliseconds. The available
+ * bandwidth is estimated over this period, and the window size is
+ * recalculated at the end of each period. It should be long enough for
+ * TCP to adapt to a change in window size; perhaps 10-100 RTTs. One
+ * second (1000) is probably a good first approximation in many cases.
+ */
+ int mTimeIntervalMillis;
+
+ /**
+ * Because our data use is bursty, and tcp nice works on the assumption
+ * that we've always got data to send, we should only enable nice mode
+ * when we're doing a bulk upload, and disable it afterwards.
+ */
+ bool mEnabled;
+
+ void StartTimer()
+ {
+ mapTimer.reset(new Timer(mTimeIntervalMillis, "NiceSocketStream"));
+ }
+
+ void StopTimer()
+ {
+ mapTimer.reset();
+ }
+
+public:
+ NiceSocketStream(std::auto_ptr<SocketStream> apSocket);
+ virtual ~NiceSocketStream()
+ {
+ // Be nice about closing the socket
+ mapSocket->Shutdown();
+ mapSocket->Close();
+ }
+
+ // This is the only magic
+ virtual void Write(const void *pBuffer, int NBytes);
+
+ // Everything else is delegated to the sink
+ virtual int Read(void *pBuffer, int NBytes,
+ int Timeout = IOStream::TimeOutInfinite)
+ {
+ return mapSocket->Read(pBuffer, NBytes, Timeout);
+ }
+ virtual pos_type BytesLeftToRead()
+ {
+ return mapSocket->BytesLeftToRead();
+ }
+ virtual pos_type GetPosition() const
+ {
+ return mapSocket->GetPosition();
+ }
+ virtual void Seek(IOStream::pos_type Offset, int SeekType)
+ {
+ mapSocket->Seek(Offset, SeekType);
+ }
+ virtual void Flush(int Timeout = IOStream::TimeOutInfinite)
+ {
+ mapSocket->Flush(Timeout);
+ }
+ virtual void Close()
+ {
+ mapSocket->Close();
+ }
+ virtual bool StreamDataLeft()
+ {
+ return mapSocket->StreamDataLeft();
+ }
+ virtual bool StreamClosed()
+ {
+ return mapSocket->StreamClosed();
+ }
+ virtual void SetEnabled(bool enabled);
+
+private:
+ NiceSocketStream(const NiceSocketStream &rToCopy)
+ { /* do not call */ }
+};
+
+#endif // TCPNICE__H