diff options
author | Chris Wilson <chris+github@qwirx.com> | 2012-02-12 12:29:56 +0000 |
---|---|---|
committer | Chris Wilson <chris+github@qwirx.com> | 2012-02-12 12:29:56 +0000 |
commit | 15a3e05144785bfcaddcf2d11b6c549edd9f62ee (patch) | |
tree | 30827463c74a3b24f4b206894580a4511203c014 /lib | |
parent | 03d7cb4d75822b40d263d0737e3115a0aeff1cda (diff) |
Add experimental "TCP Nice" mode, disabled by default.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/backupclient/BackupDaemonConfigVerify.cpp | 3 | ||||
-rw-r--r-- | lib/backupstore/BackupStoreFileDiff.cpp | 2 | ||||
-rw-r--r-- | lib/common/BoxTime.h | 4 | ||||
-rw-r--r-- | lib/common/Timer.cpp | 38 | ||||
-rw-r--r-- | lib/common/Timer.h | 4 | ||||
-rw-r--r-- | lib/server/SocketStream.h | 6 | ||||
-rw-r--r-- | lib/server/TcpNice.cpp | 204 | ||||
-rw-r--r-- | lib/server/TcpNice.h | 174 |
8 files changed, 411 insertions, 24 deletions
diff --git a/lib/backupclient/BackupDaemonConfigVerify.cpp b/lib/backupclient/BackupDaemonConfigVerify.cpp index cdd71425..a3b95335 100644 --- a/lib/backupclient/BackupDaemonConfigVerify.cpp +++ b/lib/backupclient/BackupDaemonConfigVerify.cpp @@ -117,6 +117,9 @@ static const ConfigurationVerifyKey verifyrootkeys[] = ConfigurationVerifyKey("MaxUploadRate", ConfigTest_IsInt), // optional maximum speed of uploads in kbytes per second + ConfigurationVerifyKey("TcpNice", ConfigTest_IsBool, false), + // optional enable of tcp nice/background mode + ConfigurationVerifyKey("CertificateFile", ConfigTest_Exists), ConfigurationVerifyKey("PrivateKeyFile", ConfigTest_Exists), ConfigurationVerifyKey("TrustedCAsFile", ConfigTest_Exists), diff --git a/lib/backupstore/BackupStoreFileDiff.cpp b/lib/backupstore/BackupStoreFileDiff.cpp index 5705c3aa..b39097db 100644 --- a/lib/backupstore/BackupStoreFileDiff.cpp +++ b/lib/backupstore/BackupStoreFileDiff.cpp @@ -469,7 +469,7 @@ static void SearchForMatchingBlocks(IOStream &rFile, std::map<int64_t, int64_t> if(pDiffTimer && pDiffTimer->IsManaged()) { - maximumDiffingTime = Timer(pDiffTimer->GetMaximumDiffingTime(), + maximumDiffingTime = Timer(pDiffTimer->GetMaximumDiffingTime() * 1000, "MaximumDiffingTime"); } diff --git a/lib/common/BoxTime.h b/lib/common/BoxTime.h index cc1571cd..dfc40263 100644 --- a/lib/common/BoxTime.h +++ b/lib/common/BoxTime.h @@ -27,6 +27,10 @@ inline box_time_t SecondsToBoxTime(time_t Seconds) { return ((box_time_t)Seconds * MICRO_SEC_IN_SEC_LL); } +inline uint64_t MilliSecondsToBoxTime(int64_t milliseconds) +{ + return ((box_time_t)milliseconds * 1000); +} inline time_t BoxTimeToSeconds(box_time_t Time) { return Time / MICRO_SEC_IN_SEC_LL; diff --git a/lib/common/Timer.cpp b/lib/common/Timer.cpp index c0451818..81a4dd80 100644 --- a/lib/common/Timer.cpp +++ b/lib/common/Timer.cpp @@ -335,7 +335,7 @@ void Timers::SignalHandler(int unused) // -------------------------------------------------------------------------- // // Function -// Name: Timer::Timer(size_t timeoutSecs, +// Name: Timer::Timer(size_t timeoutMillis, // const std::string& rName) // Purpose: Standard timer constructor, takes a timeout in // seconds from now, and an optional name for @@ -344,8 +344,8 @@ void Timers::SignalHandler(int unused) // // -------------------------------------------------------------------------- -Timer::Timer(size_t timeoutSecs, const std::string& rName) -: mExpires(GetCurrentBoxTime() + SecondsToBoxTime(timeoutSecs)), +Timer::Timer(size_t timeoutMillis, const std::string& rName) +: mExpires(GetCurrentBoxTime() + MilliSecondsToBoxTime(timeoutMillis)), mExpired(false), mName(rName) #ifdef WIN32 @@ -353,26 +353,26 @@ Timer::Timer(size_t timeoutSecs, const std::string& rName) #endif { #ifndef BOX_RELEASE_BUILD - if (timeoutSecs == 0) + if (timeoutMillis == 0) { - BOX_TRACE(TIMER_ID "initialised for " << timeoutSecs << - " secs, will not fire"); + BOX_TRACE(TIMER_ID "initialised for " << timeoutMillis << + " ms, will not fire"); } else { - BOX_TRACE(TIMER_ID "initialised for " << timeoutSecs << - " secs, to fire at " << FormatTime(mExpires, false, true)); + BOX_TRACE(TIMER_ID "initialised for " << timeoutMillis << + " ms, to fire at " << FormatTime(mExpires, false, true)); } #endif - if (timeoutSecs == 0) + if (timeoutMillis == 0) { mExpires = 0; } else { Timers::Add(*this); - Start(timeoutSecs * MICRO_SEC_IN_SEC_LL); + Start(timeoutMillis * 1000); } } @@ -408,7 +408,7 @@ void Timer::Start() // -------------------------------------------------------------------------- // // Function -// Name: Timer::Start(int64_t delayInMicros) +// Name: Timer::Start(int64_t timeoutMillis) // Purpose: This internal function initialises an OS TimerQueue // timer on Windows, with a specified delay already // calculated to save us doing it again. Like @@ -417,23 +417,21 @@ void Timer::Start() // // -------------------------------------------------------------------------- -void Timer::Start(int64_t delayInMicros) +void Timer::Start(int64_t timeoutMillis) { #ifdef WIN32 // only call me once! ASSERT(mTimerHandle == INVALID_HANDLE_VALUE); - int64_t delayInMillis = delayInMicros / 1000; - // Windows XP always seems to fire timers up to 20 ms late, // at least on my test laptop. Not critical in practice, but our // tests are precise enough that they will fail if we don't // correct for it. - delayInMillis -= 20; + timeoutMillis -= 20; // Set a system timer to call our timer routine if (CreateTimerQueueTimer(&mTimerHandle, NULL, TimerRoutine, - (PVOID)this, delayInMillis, 0, WT_EXECUTEINTIMERTHREAD) + (PVOID)this, timeoutMillis, 0, WT_EXECUTEINTIMERTHREAD) == FALSE) { BOX_ERROR(TIMER_ID "failed to create timer: " << @@ -523,8 +521,8 @@ Timer::Timer(const Timer& rToCopy) { BOX_TRACE(TIMER_ID "initialised from timer " << &rToCopy << ", " "to fire at " << - (int)(mExpires / 1000000) << "." << - (int)(mExpires % 1000000)); + (int)(mExpires / MICRO_SEC_IN_SEC_LL) << "." << + (int)(mExpires % MICRO_SEC_IN_SEC_LL)); } #endif @@ -564,8 +562,8 @@ Timer& Timer::operator=(const Timer& rToCopy) { BOX_TRACE(TIMER_ID "initialised from timer " << &rToCopy << ", " "to fire at " << - (int)(rToCopy.mExpires / 1000000) << "." << - (int)(rToCopy.mExpires % 1000000)); + (int)(rToCopy.mExpires / MICRO_SEC_IN_SEC_LL) << "." << + (int)(rToCopy.mExpires % MICRO_SEC_IN_SEC_LL)); } #endif diff --git a/lib/common/Timer.h b/lib/common/Timer.h index 42b2e00f..bd118a18 100644 --- a/lib/common/Timer.h +++ b/lib/common/Timer.h @@ -53,7 +53,7 @@ class Timers class Timer { public: - Timer(size_t timeoutSecs, const std::string& rName = ""); + Timer(size_t timeoutMillis, const std::string& rName = ""); virtual ~Timer(); Timer(const Timer &); Timer &operator=(const Timer &); @@ -74,7 +74,7 @@ private: std::string mName; void Start(); - void Start(int64_t delayInMicros); + void Start(int64_t timeoutMillis); void Stop(); #ifdef WIN32 diff --git a/lib/server/SocketStream.h b/lib/server/SocketStream.h index 2b582f21..2fb5e391 100644 --- a/lib/server/SocketStream.h +++ b/lib/server/SocketStream.h @@ -51,7 +51,6 @@ public: virtual bool GetPeerCredentials(uid_t &rUidOut, gid_t &rGidOut); protected: - tOSSocketHandle GetSocketHandle(); void MarkAsReadClosed() {mReadClosed = true;} void MarkAsWriteClosed() {mWriteClosed = true;} @@ -69,6 +68,11 @@ public: off_t GetBytesWritten() const {return mBytesWritten;} void ResetCounters() {mBytesRead = mBytesWritten = 0;} bool IsOpened() { return mSocketHandle != INVALID_SOCKET_VALUE; } + + /** + * Only for use by NiceSocketStream! + */ + tOSSocketHandle GetSocketHandle(); }; #endif // SOCKETSTREAM__H diff --git a/lib/server/TcpNice.cpp b/lib/server/TcpNice.cpp new file mode 100644 index 00000000..88d08bda --- /dev/null +++ b/lib/server/TcpNice.cpp @@ -0,0 +1,204 @@ +// -------------------------------------------------------------------------- +// +// File +// Name: TcpNice.cpp +// Purpose: Calculator for adaptive TCP window sizing to support +// low-priority background flows using the stochastic +// algorithm, as described in +// http://www.thlab.net/~lmassoul/p18-key.pdf +// Created: 11/02/2012 +// +// -------------------------------------------------------------------------- + +#include "Box.h" + +#include "TcpNice.h" +#include "Logging.h" +#include "BoxTime.h" + +#ifdef HAVE_NETINET_TCP_H +# include <netinet/tcp.h> +#endif + +#include "MemLeakFindOn.h" + +// -------------------------------------------------------------------------- +// +// Function +// Name: TcpNice::TcpNice() +// Purpose: Initialise state of the calculator +// Created: 11/02/2012 +// +// -------------------------------------------------------------------------- +TcpNice::TcpNice() +: mLastWindowSize(1), + mGammaPercent(100), + mAlphaStar(100), + mDeltaPercent(10) +{ } + +// -------------------------------------------------------------------------- +// +// Function +// Name: int GetNextWindowSize(int bytesChange, +// box_time_t timeElapsed, int rttEstimateMillis) +// Purpose: Calculate the next recommended window size, given the +// number of bytes sent since the previous recommendation, +// and the time elapsed. +// Created: 11/02/2012 +// +// -------------------------------------------------------------------------- +int TcpNice::GetNextWindowSize(int bytesChange, box_time_t timeElapsed, + int rttEstimateMicros) +{ + int epsilon = (mAlphaStar * 1000000) / rttEstimateMicros; + + // timeElapsed is in microseconds, so this will fail for T > 2000 seconds + int rateLastPeriod = ((uint64_t)bytesChange * 1000000 / timeElapsed); + + int rawAdjustment = epsilon + rateLastPeriod - + mRateEstimateMovingAverage[0]; + + int gammaAdjustment = (rawAdjustment * mGammaPercent) / 100; + + int newWindowSize = mLastWindowSize + gammaAdjustment; + + int newRateEstimateMovingAverage = + (((100 - mDeltaPercent) * mRateEstimateMovingAverage[1]) / 100) + + ((mDeltaPercent * rateLastPeriod) / 100); + + BOX_TRACE("TcpNice: " + "b=" << bytesChange << ", " + "T=" << timeElapsed << ", " + "rtt=" << rttEstimateMicros << ", " + "e=" << epsilon << ", " + "rb=" << rateLastPeriod << ", " + "rbhat=" << newRateEstimateMovingAverage << ", " + "raw=" << rawAdjustment << ", " + "gamma=" << gammaAdjustment << ", " + "wb=" << newWindowSize); + + mRateEstimateMovingAverage[0] = mRateEstimateMovingAverage[1]; + mRateEstimateMovingAverage[1] = newRateEstimateMovingAverage; + mLastWindowSize = newWindowSize; + + return newWindowSize; +} + +// -------------------------------------------------------------------------- +// +// Constructor +// Name: NiceSocketStream::NiceSocketStream( +// std::auto_ptr<SocketStream> apSocket) +// Purpose: Initialise state of the socket wrapper +// Created: 11/02/2012 +// +// -------------------------------------------------------------------------- + +NiceSocketStream::NiceSocketStream(std::auto_ptr<SocketStream> apSocket) +: mapSocket(apSocket), + mTcpNice(), + mBytesWrittenThisPeriod(0), + mPeriodStartTime(GetCurrentBoxTime()), + mTimeIntervalMillis(1000), + mEnabled(false) +{ } + +// -------------------------------------------------------------------------- +// +// Function +// Name: NiceSocketStream::Write(const void *pBuffer, int NBytes) +// Purpose: Writes bytes to the underlying stream, adjusting window size +// using a TcpNice calculator. +// Created: 2012/02/11 +// +// -------------------------------------------------------------------------- +void NiceSocketStream::Write(const void *pBuffer, int NBytes) +{ +#ifdef HAVE_DECL_SO_SNDBUF + if(mEnabled && mapTimer.get() && mapTimer->HasExpired()) + { + box_time_t newPeriodStart = GetCurrentBoxTime(); + box_time_t elapsed = newPeriodStart - mPeriodStartTime; + struct tcp_info info; + int socket = mapSocket->GetSocketHandle(); + int rtt = 50; // WAG + +# if defined HAVE_DECL_SOL_TCP && defined HAVE_DECL_TCP_INFO && defined HAVE_STRUCT_TCP_INFO_TCPI_RTT + socklen_t optlen = sizeof(info); + if(getsockopt(socket, SOL_TCP, TCP_INFO, &info, &optlen) == -1) + { + BOX_LOG_SYS_WARNING("getsockopt(" << socket << ", SOL_TCP, " + "TCP_INFO) failed"); + } + else if(optlen != sizeof(info)) + { + BOX_WARNING("getsockopt(" << socket << ", SOL_TCP, " + "TCP_INFO) return structure size " << optlen << ", " + "expected " << sizeof(info)); + } + else + { + rtt = info.tcpi_rtt; + } +# endif + + int newWindow = mTcpNice.GetNextWindowSize(mBytesWrittenThisPeriod, + elapsed, rtt); + + if(setsockopt(socket, SOL_SOCKET, SO_SNDBUF, &newWindow, + sizeof(newWindow)) == -1) + { + BOX_LOG_SYS_WARNING("getsockopt(" << socket << ", SOL_SOCKET, " + "SO_SNDBUF, " << newWindow << ") failed"); + } + + StopTimer(); + } + + if(mEnabled && !mapTimer.get()) + { + // Don't start the timer until we receive the first data to write, + // as diffing might take a long time and we don't want to bias + // the TcpNice algorithm by running while we don't have bulk data + // to send. + StartTimer(); + mPeriodStartTime = GetCurrentBoxTime(); + mBytesWrittenThisPeriod = 0; + } + + mBytesWrittenThisPeriod += NBytes; +#endif // HAVE_DECL_SO_SNDBUF + + mapSocket->Write(pBuffer, NBytes); +} + +// -------------------------------------------------------------------------- +// +// Function +// Name: NiceSocketStream::SetEnabled(bool enabled) +// Purpose: Update the enabled status, and if disabling, cancel the +// timer and set a sensible window size. +// Created: 2012/02/12 +// +// -------------------------------------------------------------------------- + +void NiceSocketStream::SetEnabled(bool enabled) +{ + mEnabled = enabled; + + if(!enabled) + { + StopTimer(); +#ifdef HAVE_DECL_SO_SNDBUF + int socket = mapSocket->GetSocketHandle(); + int newWindow = 1<<17; + if(setsockopt(socket, SOL_SOCKET, SO_SNDBUF, &newWindow, + sizeof(newWindow)) == -1) + { + BOX_LOG_SYS_WARNING("getsockopt(" << socket << ", SOL_SOCKET, " + "SO_SNDBUF, " << newWindow << ") failed"); + } +#endif + } +} diff --git a/lib/server/TcpNice.h b/lib/server/TcpNice.h new file mode 100644 index 00000000..e2027749 --- /dev/null +++ b/lib/server/TcpNice.h @@ -0,0 +1,174 @@ +// -------------------------------------------------------------------------- +// +// File +// Name: TcpNice.h +// Purpose: Calculator for adaptive TCP window sizing to support +// low-priority background flows using the stochastic +// algorithm, as described in +// http://www.thlab.net/~lmassoul/p18-key.pdf +// Created: 11/02/2012 +// +// -------------------------------------------------------------------------- + +#ifndef TCPNICE__H +#define TCPNICE__H + +#include <memory> + +#include "SocketStream.h" +#include "Timer.h" + +// -------------------------------------------------------------------------- +// +// Class +// Name: TcpNice +// Purpose: Calculator for adaptive TCP window sizing. +// Created: 11/02/2012 +// +// -------------------------------------------------------------------------- + +class TcpNice +{ +public: + TcpNice(); + int GetNextWindowSize(int bytesChange, box_time_t timeElapsed, + int rttEstimateMicros); + +private: + /** + * The previous (last recommended) window size is one of the parameters + * used to calculate the next window size. + */ + int mLastWindowSize; + + /** + * Controls the speed of adaptation and the variance (random variation) + * of the stable state in response to noise. The paper suggests using + * 1.0 (100%). + */ + int mGammaPercent; + + /** + * Controls the extent to which background flows are allowed to affect + * foreground flows. Its detailed meaning is not explained in the paper, + * but its units are bytes, and I think it controls how aggressive we + * are at increasing window size, potentially at the expense of other + * competing flows. + */ + int mAlphaStar; + + /** + * Controls the speed of adaptation of the exponential weighted moving + * average (EWMA) estimate of the bandwidth available to this flow. + * The paper uses 10%. + */ + int mDeltaPercent; + + /** + * The stochastic algorithm in the paper uses the rate estimate for the + * last-but-one period (rbHat[n-2]) to calculate the next window size. + * So we keep both the last (in rateEstimateMovingAverage[1]) and the + * last-but-one (in rateEstimateMovingAverage[0]) values. + */ + int mRateEstimateMovingAverage[2]; +}; + +// -------------------------------------------------------------------------- +// +// Class +// Name: NiceSocketStream +// Purpose: Wrapper around a SocketStream to limit sending rate to +// avoid interference with higher-priority flows. +// Created: 11/02/2012 +// +// -------------------------------------------------------------------------- + +class NiceSocketStream : public IOStream +{ +private: + std::auto_ptr<SocketStream> mapSocket; + TcpNice mTcpNice; + std::auto_ptr<Timer> mapTimer; + int mBytesWrittenThisPeriod; + box_time_t mPeriodStartTime; + + /** + * The control interval T from the paper, in milliseconds. The available + * bandwidth is estimated over this period, and the window size is + * recalculated at the end of each period. It should be long enough for + * TCP to adapt to a change in window size; perhaps 10-100 RTTs. One + * second (1000) is probably a good first approximation in many cases. + */ + int mTimeIntervalMillis; + + /** + * Because our data use is bursty, and tcp nice works on the assumption + * that we've always got data to send, we should only enable nice mode + * when we're doing a bulk upload, and disable it afterwards. + */ + bool mEnabled; + + void StartTimer() + { + mapTimer.reset(new Timer(mTimeIntervalMillis, "NiceSocketStream")); + } + + void StopTimer() + { + mapTimer.reset(); + } + +public: + NiceSocketStream(std::auto_ptr<SocketStream> apSocket); + virtual ~NiceSocketStream() + { + // Be nice about closing the socket + mapSocket->Shutdown(); + mapSocket->Close(); + } + + // This is the only magic + virtual void Write(const void *pBuffer, int NBytes); + + // Everything else is delegated to the sink + virtual int Read(void *pBuffer, int NBytes, + int Timeout = IOStream::TimeOutInfinite) + { + return mapSocket->Read(pBuffer, NBytes, Timeout); + } + virtual pos_type BytesLeftToRead() + { + return mapSocket->BytesLeftToRead(); + } + virtual pos_type GetPosition() const + { + return mapSocket->GetPosition(); + } + virtual void Seek(IOStream::pos_type Offset, int SeekType) + { + mapSocket->Seek(Offset, SeekType); + } + virtual void Flush(int Timeout = IOStream::TimeOutInfinite) + { + mapSocket->Flush(Timeout); + } + virtual void Close() + { + mapSocket->Close(); + } + virtual bool StreamDataLeft() + { + return mapSocket->StreamDataLeft(); + } + virtual bool StreamClosed() + { + return mapSocket->StreamClosed(); + } + virtual void SetEnabled(bool enabled); + +private: + NiceSocketStream(const NiceSocketStream &rToCopy) + { /* do not call */ } +}; + +#endif // TCPNICE__H |