From 15a3e05144785bfcaddcf2d11b6c549edd9f62ee Mon Sep 17 00:00:00 2001 From: Chris Wilson Date: Sun, 12 Feb 2012 12:29:56 +0000 Subject: Add experimental "TCP Nice" mode, disabled by default. --- bin/bbackupd/BackupClientContext.cpp | 118 +++++++-------- bin/bbackupd/BackupClientContext.h | 18 ++- bin/bbackupd/BackupClientDirectoryRecord.cpp | 7 + bin/bbackupd/BackupDaemon.cpp | 4 +- infrastructure/m4/boxbackup_tests.m4 | 6 +- lib/backupclient/BackupDaemonConfigVerify.cpp | 3 + lib/backupstore/BackupStoreFileDiff.cpp | 2 +- lib/common/BoxTime.h | 4 + lib/common/Timer.cpp | 38 +++-- lib/common/Timer.h | 4 +- lib/server/SocketStream.h | 6 +- lib/server/TcpNice.cpp | 204 ++++++++++++++++++++++++++ lib/server/TcpNice.h | 174 ++++++++++++++++++++++ 13 files changed, 498 insertions(+), 90 deletions(-) create mode 100644 lib/server/TcpNice.cpp create mode 100644 lib/server/TcpNice.h diff --git a/bin/bbackupd/BackupClientContext.cpp b/bin/bbackupd/BackupClientContext.cpp index 7602ed29..e1c4349f 100644 --- a/bin/bbackupd/BackupClientContext.cpp +++ b/bin/bbackupd/BackupClientContext.cpp @@ -28,6 +28,7 @@ #include "autogen_BackupProtocol.h" #include "BackupStoreFile.h" #include "Logging.h" +#include "TcpNice.h" #include "MemLeakFindOn.h" @@ -49,15 +50,14 @@ BackupClientContext::BackupClientContext bool ExtendedLogging, bool ExtendedLogToFile, std::string ExtendedLogFile, - ProgressNotifier& rProgressNotifier + ProgressNotifier& rProgressNotifier, + bool TcpNiceMode ) : mrResolver(rResolver), mrTLSContext(rTLSContext), mHostname(rHostname), mPort(Port), mAccountNumber(AccountNumber), - mpSocket(0), - mpConnection(0), mExtendedLogging(ExtendedLogging), mExtendedLogToFile(ExtendedLogToFile), mExtendedLogFile(ExtendedLogFile), @@ -71,7 +71,8 @@ BackupClientContext::BackupClientContext mpExcludeDirs(0), mKeepAliveTimer(0, "KeepAliveTime"), mbIsManaged(false), - mrProgressNotifier(rProgressNotifier) + mrProgressNotifier(rProgressNotifier), + mTcpNiceMode(TcpNiceMode) { } @@ -107,40 +108,42 @@ BackupClientContext::~BackupClientContext() BackupProtocolClient &BackupClientContext::GetConnection() { // Already got it? Just return it. - if(mpConnection != 0) + if(mapConnection.get()) { - return *mpConnection; + return *mapConnection; } - // Get a socket connection - if(mpSocket == 0) - { - mpSocket = new SocketStreamTLS; - ASSERT(mpSocket != 0); // will have exceptioned if this was a problem - } + // there shouldn't be a connection open + ASSERT(mapSocket.get() == 0); + + SocketStreamTLS *pSocket = new SocketStreamTLS; try { // Defensive. - if(mpConnection != 0) - { - delete mpConnection; - mpConnection = 0; - } + mapConnection.reset(); // Log intention BOX_INFO("Opening connection to server '" << mHostname << "'..."); // Connect! - mpSocket->Open(mrTLSContext, Socket::TypeINET, - mHostname.c_str(), mPort); + pSocket->Open(mrTLSContext, Socket::TypeINET, mHostname.c_str(), mPort); + + if(mTcpNiceMode) + { + mapNice.reset(new NiceSocketStream(std::auto_ptr(pSocket))); + mapConnection.reset(new BackupProtocolClient(*mapNice)); + } + else + { + mapConnection.reset(new BackupProtocolClient(*pSocket)); + } - // And create a procotol object - mpConnection = new BackupProtocolClient(*mpSocket); + pSocket = NULL; // Set logging option - mpConnection->SetLogToSysLog(mExtendedLogging); + mapConnection->SetLogToSysLog(mExtendedLogging); if (mExtendedLogToFile) { @@ -156,16 +159,17 @@ BackupProtocolClient &BackupClientContext::GetConnection() } else { - mpConnection->SetLogToFile(mpExtendedLogFileHandle); + mapConnection->SetLogToFile(mpExtendedLogFileHandle); } } // Handshake - mpConnection->Handshake(); + mapConnection->Handshake(); // Check the version of the server { - std::auto_ptr serverVersion(mpConnection->QueryVersion(BACKUP_STORE_SERVER_VERSION)); + std::auto_ptr serverVersion( + mapConnection->QueryVersion(BACKUP_STORE_SERVER_VERSION)); if(serverVersion->GetVersion() != BACKUP_STORE_SERVER_VERSION) { THROW_EXCEPTION(BackupStoreException, WrongServerVersion) @@ -173,7 +177,8 @@ BackupProtocolClient &BackupClientContext::GetConnection() } // Login -- if this fails, the Protocol will exception - std::auto_ptr loginConf(mpConnection->QueryLogin(mAccountNumber, 0 /* read/write */)); + std::auto_ptr loginConf( + mapConnection->QueryLogin(mAccountNumber, 0 /* read/write */)); // Check that the client store marker is the one we expect if(mClientStoreMarker != ClientStoreMarker_NotKnown) @@ -183,9 +188,9 @@ BackupProtocolClient &BackupClientContext::GetConnection() // Not good... finish the connection, abort, etc, ignoring errors try { - mpConnection->QueryFinished(); - mpSocket->Shutdown(); - mpSocket->Close(); + mapConnection->QueryFinished(); + mapSocket.reset(); + mapNice.reset(); } catch(...) { @@ -213,14 +218,13 @@ BackupProtocolClient &BackupClientContext::GetConnection() catch(...) { // Clean up. - delete mpConnection; - mpConnection = 0; - delete mpSocket; - mpSocket = 0; + mapConnection.reset(); + mapSocket.reset(); + mapNice.reset(); throw; } - return *mpConnection; + return *mapConnection; } // -------------------------------------------------------------------------- @@ -233,7 +237,7 @@ BackupProtocolClient &BackupClientContext::GetConnection() // -------------------------------------------------------------------------- void BackupClientContext::CloseAnyOpenConnection() { - if(mpConnection) + if(mapConnection.get()) { try { @@ -244,14 +248,14 @@ void BackupClientContext::CloseAnyOpenConnection() box_time_t marker = GetCurrentBoxTime(); // Set it on the store - mpConnection->QuerySetClientStoreMarker(marker); + mapConnection->QuerySetClientStoreMarker(marker); // Record it so that it can be picked up later. mClientStoreMarker = marker; } // Quit nicely - mpConnection->QueryFinished(); + mapConnection->QueryFinished(); } catch(...) { @@ -259,26 +263,18 @@ void BackupClientContext::CloseAnyOpenConnection() } // Delete it anyway. - delete mpConnection; - mpConnection = 0; + mapConnection.reset(); } - - if(mpSocket) + + try { - try - { - // Be nice about closing the socket - mpSocket->Shutdown(); - mpSocket->Close(); - } - catch(...) - { - // Ignore errors - } - - // Delete object - delete mpSocket; - mpSocket = 0; + // Be nice about closing the socket + mapSocket.reset(); + mapNice.reset(); + } + catch(...) + { + // Ignore errors } // Delete any pending list @@ -307,9 +303,9 @@ void BackupClientContext::CloseAnyOpenConnection() // -------------------------------------------------------------------------- int BackupClientContext::GetTimeout() const { - if(mpConnection) + if(mapConnection.get()) { - return mpConnection->GetTimeout(); + return mapConnection->GetTimeout(); } return (15*60*1000); @@ -509,7 +505,7 @@ void BackupClientContext::SetKeepAliveTime(int iSeconds) { mKeepAliveTime = iSeconds < 0 ? 0 : iSeconds; BOX_TRACE("Set keep-alive time to " << mKeepAliveTime << " seconds"); - mKeepAliveTimer = Timer(mKeepAliveTime, "KeepAliveTime"); + mKeepAliveTimer = Timer(mKeepAliveTime * 1000, "KeepAliveTime"); } // -------------------------------------------------------------------------- @@ -551,7 +547,7 @@ void BackupClientContext::UnManageDiffProcess() // -------------------------------------------------------------------------- void BackupClientContext::DoKeepAlive() { - if (!mpConnection) + if (!mapConnection.get()) { return; } @@ -567,9 +563,9 @@ void BackupClientContext::DoKeepAlive() } BOX_TRACE("KeepAliveTime reached, sending keep-alive message"); - mpConnection->QueryGetIsAlive(); + mapConnection->QueryGetIsAlive(); - mKeepAliveTimer = Timer(mKeepAliveTime, "KeepAliveTime"); + mKeepAliveTimer = Timer(mKeepAliveTime * 1000, "KeepAliveTime"); } int BackupClientContext::GetMaximumDiffingTime() diff --git a/bin/bbackupd/BackupClientContext.h b/bin/bbackupd/BackupClientContext.h index 404d2d77..41d90ff0 100644 --- a/bin/bbackupd/BackupClientContext.h +++ b/bin/bbackupd/BackupClientContext.h @@ -16,6 +16,7 @@ #include "BackupDaemonInterface.h" #include "BackupStoreFile.h" #include "ExcludeList.h" +#include "TcpNice.h" #include "Timer.h" class TLSContext; @@ -49,7 +50,8 @@ public: bool ExtendedLogging, bool ExtendedLogToFile, std::string ExtendedLogFile, - ProgressNotifier &rProgressNotifier + ProgressNotifier &rProgressNotifier, + bool TcpNiceMode ); virtual ~BackupClientContext(); private: @@ -207,6 +209,14 @@ public: { return mrProgressNotifier; } + + void SetNiceMode(bool enabled) + { + if(mTcpNiceMode) + { + mapNice->SetEnabled(enabled); + } + } private: LocationResolver &mrResolver; @@ -214,8 +224,9 @@ private: std::string mHostname; int mPort; uint32_t mAccountNumber; - SocketStreamTLS *mpSocket; - BackupProtocolClient *mpConnection; + std::auto_ptr mapSocket; + std::auto_ptr mapNice; + std::auto_ptr mapConnection; bool mExtendedLogging; bool mExtendedLogToFile; std::string mExtendedLogFile; @@ -232,6 +243,7 @@ private: int mKeepAliveTime; int mMaximumDiffingTime; ProgressNotifier &mrProgressNotifier; + bool mTcpNiceMode; }; #endif // BACKUPCLIENTCONTEXT__H diff --git a/bin/bbackupd/BackupClientDirectoryRecord.cpp b/bin/bbackupd/BackupClientDirectoryRecord.cpp index 86c9688f..3a0ed08b 100644 --- a/bin/bbackupd/BackupClientDirectoryRecord.cpp +++ b/bin/bbackupd/BackupClientDirectoryRecord.cpp @@ -1671,6 +1671,7 @@ int64_t BackupClientDirectoryRecord::UploadFile( &isCompletelyDifferent)); rContext.UnManageDiffProcess(); + rContext.SetNiceMode(true); RateLimitingStream rateLimit(*patchStream, rParams.mMaxUploadRate); @@ -1690,6 +1691,8 @@ int64_t BackupClientDirectoryRecord::UploadFile( // std::auto_ptr stored(connection.QueryStoreFile(mObjectID, ModificationTime, AttributesHash, isCompletelyDifferent?(0):(diffFromID), rStoreFilename, *pStreamToUpload)); + + rContext.SetNiceMode(false); // Get object ID from the result objID = stored->GetObjectID(); @@ -1715,6 +1718,8 @@ int64_t BackupClientDirectoryRecord::UploadFile( &rParams, &(rParams.mrRunStatusProvider))); + rContext.SetNiceMode(true); + RateLimitingStream rateLimit(*upload, rParams.mMaxUploadRate); IOStream* pStreamToUpload; @@ -1735,6 +1740,8 @@ int64_t BackupClientDirectoryRecord::UploadFile( AttributesHash, 0 /* no diff from file ID */, rStoreFilename, *pStreamToUpload)); + + rContext.SetNiceMode(false); // Get object ID from the result objID = stored->GetObjectID(); diff --git a/bin/bbackupd/BackupDaemon.cpp b/bin/bbackupd/BackupDaemon.cpp index 3d0c9533..bca045d4 100644 --- a/bin/bbackupd/BackupDaemon.cpp +++ b/bin/bbackupd/BackupDaemon.cpp @@ -844,7 +844,9 @@ void BackupDaemon::RunSyncNow() conf.GetKeyValueUint32("AccountNumber"), conf.GetKeyValueBool("ExtendedLogging"), conf.KeyExists("ExtendedLogFile"), - extendedLogFile, *mpProgressNotifier + extendedLogFile, + *mpProgressNotifier, + conf.GetKeyValueBool("TcpNice") ); // The minimum age a file needs to be before it will be diff --git a/infrastructure/m4/boxbackup_tests.m4 b/infrastructure/m4/boxbackup_tests.m4 index 98ab7069..dce061af 100644 --- a/infrastructure/m4/boxbackup_tests.m4 +++ b/infrastructure/m4/boxbackup_tests.m4 @@ -131,7 +131,7 @@ AC_HEADER_STDC AC_HEADER_SYS_WAIT AC_CHECK_HEADERS([dlfcn.h fcntl.h getopt.h process.h pwd.h signal.h]) AC_CHECK_HEADERS([syslog.h time.h cxxabi.h]) -AC_CHECK_HEADERS([netinet/in.h]) +AC_CHECK_HEADERS([netinet/in.h netinet/tcp.h]) AC_CHECK_HEADERS([sys/file.h sys/param.h sys/socket.h sys/time.h sys/types.h sys/wait.h]) AC_CHECK_HEADERS([sys/uio.h sys/xattr.h]) AC_CHECK_HEADERS([bsd/unistd.h]) @@ -193,10 +193,14 @@ AC_CHECK_MEMBERS([struct sockaddr_in.sin_len],,, [[ ]]) AC_CHECK_MEMBERS([DIR.d_fd],,, [[#include ]]) AC_CHECK_MEMBERS([DIR.dd_fd],,, [[#include ]]) +AC_CHECK_MEMBERS([struct tcp_info.tcpi_rtt],,, [[#include ]]) AC_CHECK_DECLS([INFTIM],,, [[#include ]]) AC_CHECK_DECLS([SO_PEERCRED],,, [[#include ]]) +AC_CHECK_DECLS([SO_SNDBUF],,, [[#include ]]) AC_CHECK_DECLS([O_BINARY],,,) +AC_CHECK_DECLS([SOL_TCP],,, [[#include ]]) +AC_CHECK_DECLS([TCP_INFO],,, [[#include ]]) # Solaris provides getpeerucred() instead of getpeereid() or SO_PEERCRED AC_CHECK_HEADERS([ucred.h]) diff --git a/lib/backupclient/BackupDaemonConfigVerify.cpp b/lib/backupclient/BackupDaemonConfigVerify.cpp index cdd71425..a3b95335 100644 --- a/lib/backupclient/BackupDaemonConfigVerify.cpp +++ b/lib/backupclient/BackupDaemonConfigVerify.cpp @@ -117,6 +117,9 @@ static const ConfigurationVerifyKey verifyrootkeys[] = ConfigurationVerifyKey("MaxUploadRate", ConfigTest_IsInt), // optional maximum speed of uploads in kbytes per second + ConfigurationVerifyKey("TcpNice", ConfigTest_IsBool, false), + // optional enable of tcp nice/background mode + ConfigurationVerifyKey("CertificateFile", ConfigTest_Exists), ConfigurationVerifyKey("PrivateKeyFile", ConfigTest_Exists), ConfigurationVerifyKey("TrustedCAsFile", ConfigTest_Exists), diff --git a/lib/backupstore/BackupStoreFileDiff.cpp b/lib/backupstore/BackupStoreFileDiff.cpp index 5705c3aa..b39097db 100644 --- a/lib/backupstore/BackupStoreFileDiff.cpp +++ b/lib/backupstore/BackupStoreFileDiff.cpp @@ -469,7 +469,7 @@ static void SearchForMatchingBlocks(IOStream &rFile, std::map if(pDiffTimer && pDiffTimer->IsManaged()) { - maximumDiffingTime = Timer(pDiffTimer->GetMaximumDiffingTime(), + maximumDiffingTime = Timer(pDiffTimer->GetMaximumDiffingTime() * 1000, "MaximumDiffingTime"); } diff --git a/lib/common/BoxTime.h b/lib/common/BoxTime.h index cc1571cd..dfc40263 100644 --- a/lib/common/BoxTime.h +++ b/lib/common/BoxTime.h @@ -27,6 +27,10 @@ inline box_time_t SecondsToBoxTime(time_t Seconds) { return ((box_time_t)Seconds * MICRO_SEC_IN_SEC_LL); } +inline uint64_t MilliSecondsToBoxTime(int64_t milliseconds) +{ + return ((box_time_t)milliseconds * 1000); +} inline time_t BoxTimeToSeconds(box_time_t Time) { return Time / MICRO_SEC_IN_SEC_LL; diff --git a/lib/common/Timer.cpp b/lib/common/Timer.cpp index c0451818..81a4dd80 100644 --- a/lib/common/Timer.cpp +++ b/lib/common/Timer.cpp @@ -335,7 +335,7 @@ void Timers::SignalHandler(int unused) // -------------------------------------------------------------------------- // // Function -// Name: Timer::Timer(size_t timeoutSecs, +// Name: Timer::Timer(size_t timeoutMillis, // const std::string& rName) // Purpose: Standard timer constructor, takes a timeout in // seconds from now, and an optional name for @@ -344,8 +344,8 @@ void Timers::SignalHandler(int unused) // // -------------------------------------------------------------------------- -Timer::Timer(size_t timeoutSecs, const std::string& rName) -: mExpires(GetCurrentBoxTime() + SecondsToBoxTime(timeoutSecs)), +Timer::Timer(size_t timeoutMillis, const std::string& rName) +: mExpires(GetCurrentBoxTime() + MilliSecondsToBoxTime(timeoutMillis)), mExpired(false), mName(rName) #ifdef WIN32 @@ -353,26 +353,26 @@ Timer::Timer(size_t timeoutSecs, const std::string& rName) #endif { #ifndef BOX_RELEASE_BUILD - if (timeoutSecs == 0) + if (timeoutMillis == 0) { - BOX_TRACE(TIMER_ID "initialised for " << timeoutSecs << - " secs, will not fire"); + BOX_TRACE(TIMER_ID "initialised for " << timeoutMillis << + " ms, will not fire"); } else { - BOX_TRACE(TIMER_ID "initialised for " << timeoutSecs << - " secs, to fire at " << FormatTime(mExpires, false, true)); + BOX_TRACE(TIMER_ID "initialised for " << timeoutMillis << + " ms, to fire at " << FormatTime(mExpires, false, true)); } #endif - if (timeoutSecs == 0) + if (timeoutMillis == 0) { mExpires = 0; } else { Timers::Add(*this); - Start(timeoutSecs * MICRO_SEC_IN_SEC_LL); + Start(timeoutMillis * 1000); } } @@ -408,7 +408,7 @@ void Timer::Start() // -------------------------------------------------------------------------- // // Function -// Name: Timer::Start(int64_t delayInMicros) +// Name: Timer::Start(int64_t timeoutMillis) // Purpose: This internal function initialises an OS TimerQueue // timer on Windows, with a specified delay already // calculated to save us doing it again. Like @@ -417,23 +417,21 @@ void Timer::Start() // // -------------------------------------------------------------------------- -void Timer::Start(int64_t delayInMicros) +void Timer::Start(int64_t timeoutMillis) { #ifdef WIN32 // only call me once! ASSERT(mTimerHandle == INVALID_HANDLE_VALUE); - int64_t delayInMillis = delayInMicros / 1000; - // Windows XP always seems to fire timers up to 20 ms late, // at least on my test laptop. Not critical in practice, but our // tests are precise enough that they will fail if we don't // correct for it. - delayInMillis -= 20; + timeoutMillis -= 20; // Set a system timer to call our timer routine if (CreateTimerQueueTimer(&mTimerHandle, NULL, TimerRoutine, - (PVOID)this, delayInMillis, 0, WT_EXECUTEINTIMERTHREAD) + (PVOID)this, timeoutMillis, 0, WT_EXECUTEINTIMERTHREAD) == FALSE) { BOX_ERROR(TIMER_ID "failed to create timer: " << @@ -523,8 +521,8 @@ Timer::Timer(const Timer& rToCopy) { BOX_TRACE(TIMER_ID "initialised from timer " << &rToCopy << ", " "to fire at " << - (int)(mExpires / 1000000) << "." << - (int)(mExpires % 1000000)); + (int)(mExpires / MICRO_SEC_IN_SEC_LL) << "." << + (int)(mExpires % MICRO_SEC_IN_SEC_LL)); } #endif @@ -564,8 +562,8 @@ Timer& Timer::operator=(const Timer& rToCopy) { BOX_TRACE(TIMER_ID "initialised from timer " << &rToCopy << ", " "to fire at " << - (int)(rToCopy.mExpires / 1000000) << "." << - (int)(rToCopy.mExpires % 1000000)); + (int)(rToCopy.mExpires / MICRO_SEC_IN_SEC_LL) << "." << + (int)(rToCopy.mExpires % MICRO_SEC_IN_SEC_LL)); } #endif diff --git a/lib/common/Timer.h b/lib/common/Timer.h index 42b2e00f..bd118a18 100644 --- a/lib/common/Timer.h +++ b/lib/common/Timer.h @@ -53,7 +53,7 @@ class Timers class Timer { public: - Timer(size_t timeoutSecs, const std::string& rName = ""); + Timer(size_t timeoutMillis, const std::string& rName = ""); virtual ~Timer(); Timer(const Timer &); Timer &operator=(const Timer &); @@ -74,7 +74,7 @@ private: std::string mName; void Start(); - void Start(int64_t delayInMicros); + void Start(int64_t timeoutMillis); void Stop(); #ifdef WIN32 diff --git a/lib/server/SocketStream.h b/lib/server/SocketStream.h index 2b582f21..2fb5e391 100644 --- a/lib/server/SocketStream.h +++ b/lib/server/SocketStream.h @@ -51,7 +51,6 @@ public: virtual bool GetPeerCredentials(uid_t &rUidOut, gid_t &rGidOut); protected: - tOSSocketHandle GetSocketHandle(); void MarkAsReadClosed() {mReadClosed = true;} void MarkAsWriteClosed() {mWriteClosed = true;} @@ -69,6 +68,11 @@ public: off_t GetBytesWritten() const {return mBytesWritten;} void ResetCounters() {mBytesRead = mBytesWritten = 0;} bool IsOpened() { return mSocketHandle != INVALID_SOCKET_VALUE; } + + /** + * Only for use by NiceSocketStream! + */ + tOSSocketHandle GetSocketHandle(); }; #endif // SOCKETSTREAM__H diff --git a/lib/server/TcpNice.cpp b/lib/server/TcpNice.cpp new file mode 100644 index 00000000..88d08bda --- /dev/null +++ b/lib/server/TcpNice.cpp @@ -0,0 +1,204 @@ +// -------------------------------------------------------------------------- +// +// File +// Name: TcpNice.cpp +// Purpose: Calculator for adaptive TCP window sizing to support +// low-priority background flows using the stochastic +// algorithm, as described in +// http://www.thlab.net/~lmassoul/p18-key.pdf +// Created: 11/02/2012 +// +// -------------------------------------------------------------------------- + +#include "Box.h" + +#include "TcpNice.h" +#include "Logging.h" +#include "BoxTime.h" + +#ifdef HAVE_NETINET_TCP_H +# include +#endif + +#include "MemLeakFindOn.h" + +// -------------------------------------------------------------------------- +// +// Function +// Name: TcpNice::TcpNice() +// Purpose: Initialise state of the calculator +// Created: 11/02/2012 +// +// -------------------------------------------------------------------------- +TcpNice::TcpNice() +: mLastWindowSize(1), + mGammaPercent(100), + mAlphaStar(100), + mDeltaPercent(10) +{ } + +// -------------------------------------------------------------------------- +// +// Function +// Name: int GetNextWindowSize(int bytesChange, +// box_time_t timeElapsed, int rttEstimateMillis) +// Purpose: Calculate the next recommended window size, given the +// number of bytes sent since the previous recommendation, +// and the time elapsed. +// Created: 11/02/2012 +// +// -------------------------------------------------------------------------- +int TcpNice::GetNextWindowSize(int bytesChange, box_time_t timeElapsed, + int rttEstimateMicros) +{ + int epsilon = (mAlphaStar * 1000000) / rttEstimateMicros; + + // timeElapsed is in microseconds, so this will fail for T > 2000 seconds + int rateLastPeriod = ((uint64_t)bytesChange * 1000000 / timeElapsed); + + int rawAdjustment = epsilon + rateLastPeriod - + mRateEstimateMovingAverage[0]; + + int gammaAdjustment = (rawAdjustment * mGammaPercent) / 100; + + int newWindowSize = mLastWindowSize + gammaAdjustment; + + int newRateEstimateMovingAverage = + (((100 - mDeltaPercent) * mRateEstimateMovingAverage[1]) / 100) + + ((mDeltaPercent * rateLastPeriod) / 100); + + BOX_TRACE("TcpNice: " + "b=" << bytesChange << ", " + "T=" << timeElapsed << ", " + "rtt=" << rttEstimateMicros << ", " + "e=" << epsilon << ", " + "rb=" << rateLastPeriod << ", " + "rbhat=" << newRateEstimateMovingAverage << ", " + "raw=" << rawAdjustment << ", " + "gamma=" << gammaAdjustment << ", " + "wb=" << newWindowSize); + + mRateEstimateMovingAverage[0] = mRateEstimateMovingAverage[1]; + mRateEstimateMovingAverage[1] = newRateEstimateMovingAverage; + mLastWindowSize = newWindowSize; + + return newWindowSize; +} + +// -------------------------------------------------------------------------- +// +// Constructor +// Name: NiceSocketStream::NiceSocketStream( +// std::auto_ptr apSocket) +// Purpose: Initialise state of the socket wrapper +// Created: 11/02/2012 +// +// -------------------------------------------------------------------------- + +NiceSocketStream::NiceSocketStream(std::auto_ptr apSocket) +: mapSocket(apSocket), + mTcpNice(), + mBytesWrittenThisPeriod(0), + mPeriodStartTime(GetCurrentBoxTime()), + mTimeIntervalMillis(1000), + mEnabled(false) +{ } + +// -------------------------------------------------------------------------- +// +// Function +// Name: NiceSocketStream::Write(const void *pBuffer, int NBytes) +// Purpose: Writes bytes to the underlying stream, adjusting window size +// using a TcpNice calculator. +// Created: 2012/02/11 +// +// -------------------------------------------------------------------------- +void NiceSocketStream::Write(const void *pBuffer, int NBytes) +{ +#ifdef HAVE_DECL_SO_SNDBUF + if(mEnabled && mapTimer.get() && mapTimer->HasExpired()) + { + box_time_t newPeriodStart = GetCurrentBoxTime(); + box_time_t elapsed = newPeriodStart - mPeriodStartTime; + struct tcp_info info; + int socket = mapSocket->GetSocketHandle(); + int rtt = 50; // WAG + +# if defined HAVE_DECL_SOL_TCP && defined HAVE_DECL_TCP_INFO && defined HAVE_STRUCT_TCP_INFO_TCPI_RTT + socklen_t optlen = sizeof(info); + if(getsockopt(socket, SOL_TCP, TCP_INFO, &info, &optlen) == -1) + { + BOX_LOG_SYS_WARNING("getsockopt(" << socket << ", SOL_TCP, " + "TCP_INFO) failed"); + } + else if(optlen != sizeof(info)) + { + BOX_WARNING("getsockopt(" << socket << ", SOL_TCP, " + "TCP_INFO) return structure size " << optlen << ", " + "expected " << sizeof(info)); + } + else + { + rtt = info.tcpi_rtt; + } +# endif + + int newWindow = mTcpNice.GetNextWindowSize(mBytesWrittenThisPeriod, + elapsed, rtt); + + if(setsockopt(socket, SOL_SOCKET, SO_SNDBUF, &newWindow, + sizeof(newWindow)) == -1) + { + BOX_LOG_SYS_WARNING("getsockopt(" << socket << ", SOL_SOCKET, " + "SO_SNDBUF, " << newWindow << ") failed"); + } + + StopTimer(); + } + + if(mEnabled && !mapTimer.get()) + { + // Don't start the timer until we receive the first data to write, + // as diffing might take a long time and we don't want to bias + // the TcpNice algorithm by running while we don't have bulk data + // to send. + StartTimer(); + mPeriodStartTime = GetCurrentBoxTime(); + mBytesWrittenThisPeriod = 0; + } + + mBytesWrittenThisPeriod += NBytes; +#endif // HAVE_DECL_SO_SNDBUF + + mapSocket->Write(pBuffer, NBytes); +} + +// -------------------------------------------------------------------------- +// +// Function +// Name: NiceSocketStream::SetEnabled(bool enabled) +// Purpose: Update the enabled status, and if disabling, cancel the +// timer and set a sensible window size. +// Created: 2012/02/12 +// +// -------------------------------------------------------------------------- + +void NiceSocketStream::SetEnabled(bool enabled) +{ + mEnabled = enabled; + + if(!enabled) + { + StopTimer(); +#ifdef HAVE_DECL_SO_SNDBUF + int socket = mapSocket->GetSocketHandle(); + int newWindow = 1<<17; + if(setsockopt(socket, SOL_SOCKET, SO_SNDBUF, &newWindow, + sizeof(newWindow)) == -1) + { + BOX_LOG_SYS_WARNING("getsockopt(" << socket << ", SOL_SOCKET, " + "SO_SNDBUF, " << newWindow << ") failed"); + } +#endif + } +} diff --git a/lib/server/TcpNice.h b/lib/server/TcpNice.h new file mode 100644 index 00000000..e2027749 --- /dev/null +++ b/lib/server/TcpNice.h @@ -0,0 +1,174 @@ +// -------------------------------------------------------------------------- +// +// File +// Name: TcpNice.h +// Purpose: Calculator for adaptive TCP window sizing to support +// low-priority background flows using the stochastic +// algorithm, as described in +// http://www.thlab.net/~lmassoul/p18-key.pdf +// Created: 11/02/2012 +// +// -------------------------------------------------------------------------- + +#ifndef TCPNICE__H +#define TCPNICE__H + +#include + +#include "SocketStream.h" +#include "Timer.h" + +// -------------------------------------------------------------------------- +// +// Class +// Name: TcpNice +// Purpose: Calculator for adaptive TCP window sizing. +// Created: 11/02/2012 +// +// -------------------------------------------------------------------------- + +class TcpNice +{ +public: + TcpNice(); + int GetNextWindowSize(int bytesChange, box_time_t timeElapsed, + int rttEstimateMicros); + +private: + /** + * The previous (last recommended) window size is one of the parameters + * used to calculate the next window size. + */ + int mLastWindowSize; + + /** + * Controls the speed of adaptation and the variance (random variation) + * of the stable state in response to noise. The paper suggests using + * 1.0 (100%). + */ + int mGammaPercent; + + /** + * Controls the extent to which background flows are allowed to affect + * foreground flows. Its detailed meaning is not explained in the paper, + * but its units are bytes, and I think it controls how aggressive we + * are at increasing window size, potentially at the expense of other + * competing flows. + */ + int mAlphaStar; + + /** + * Controls the speed of adaptation of the exponential weighted moving + * average (EWMA) estimate of the bandwidth available to this flow. + * The paper uses 10%. + */ + int mDeltaPercent; + + /** + * The stochastic algorithm in the paper uses the rate estimate for the + * last-but-one period (rbHat[n-2]) to calculate the next window size. + * So we keep both the last (in rateEstimateMovingAverage[1]) and the + * last-but-one (in rateEstimateMovingAverage[0]) values. + */ + int mRateEstimateMovingAverage[2]; +}; + +// -------------------------------------------------------------------------- +// +// Class +// Name: NiceSocketStream +// Purpose: Wrapper around a SocketStream to limit sending rate to +// avoid interference with higher-priority flows. +// Created: 11/02/2012 +// +// -------------------------------------------------------------------------- + +class NiceSocketStream : public IOStream +{ +private: + std::auto_ptr mapSocket; + TcpNice mTcpNice; + std::auto_ptr mapTimer; + int mBytesWrittenThisPeriod; + box_time_t mPeriodStartTime; + + /** + * The control interval T from the paper, in milliseconds. The available + * bandwidth is estimated over this period, and the window size is + * recalculated at the end of each period. It should be long enough for + * TCP to adapt to a change in window size; perhaps 10-100 RTTs. One + * second (1000) is probably a good first approximation in many cases. + */ + int mTimeIntervalMillis; + + /** + * Because our data use is bursty, and tcp nice works on the assumption + * that we've always got data to send, we should only enable nice mode + * when we're doing a bulk upload, and disable it afterwards. + */ + bool mEnabled; + + void StartTimer() + { + mapTimer.reset(new Timer(mTimeIntervalMillis, "NiceSocketStream")); + } + + void StopTimer() + { + mapTimer.reset(); + } + +public: + NiceSocketStream(std::auto_ptr apSocket); + virtual ~NiceSocketStream() + { + // Be nice about closing the socket + mapSocket->Shutdown(); + mapSocket->Close(); + } + + // This is the only magic + virtual void Write(const void *pBuffer, int NBytes); + + // Everything else is delegated to the sink + virtual int Read(void *pBuffer, int NBytes, + int Timeout = IOStream::TimeOutInfinite) + { + return mapSocket->Read(pBuffer, NBytes, Timeout); + } + virtual pos_type BytesLeftToRead() + { + return mapSocket->BytesLeftToRead(); + } + virtual pos_type GetPosition() const + { + return mapSocket->GetPosition(); + } + virtual void Seek(IOStream::pos_type Offset, int SeekType) + { + mapSocket->Seek(Offset, SeekType); + } + virtual void Flush(int Timeout = IOStream::TimeOutInfinite) + { + mapSocket->Flush(Timeout); + } + virtual void Close() + { + mapSocket->Close(); + } + virtual bool StreamDataLeft() + { + return mapSocket->StreamDataLeft(); + } + virtual bool StreamClosed() + { + return mapSocket->StreamClosed(); + } + virtual void SetEnabled(bool enabled); + +private: + NiceSocketStream(const NiceSocketStream &rToCopy) + { /* do not call */ } +}; + +#endif // TCPNICE__H -- cgit v1.2.3