summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorChris Wilson <chris+github@qwirx.com>2011-01-12 00:11:53 +0000
committerChris Wilson <chris+github@qwirx.com>2011-01-12 00:11:53 +0000
commit8ee2327add953c8282807c988dcaef7a2d03a7f1 (patch)
treec97d2b6c91b4c7d3c4d4d4c720ae3d0599b505df /lib
parentbc6ccea0d973ce4a0eb370191c3bbcd22d6cb45b (diff)
Add an implementation of a stream wrapper that limits reading rate, to
control bandwidth usage.
Diffstat (limited to 'lib')
-rw-r--r--lib/common/RateLimitingStream.cpp87
-rw-r--r--lib/common/RateLimitingStream.h71
2 files changed, 158 insertions, 0 deletions
diff --git a/lib/common/RateLimitingStream.cpp b/lib/common/RateLimitingStream.cpp
new file mode 100644
index 00000000..67a97e6b
--- /dev/null
+++ b/lib/common/RateLimitingStream.cpp
@@ -0,0 +1,87 @@
+// --------------------------------------------------------------------------
+//
+// File
+// Name: RateLimitingStream.cpp
+// Purpose: Rate-limiting write-only wrapper around IOStreams
+// Created: 2011/01/11
+//
+// --------------------------------------------------------------------------
+
+#include "Box.h"
+#include "RateLimitingStream.h"
+#include "CommonException.h"
+
+#include <string.h>
+
+#include "MemLeakFindOn.h"
+
+// --------------------------------------------------------------------------
+//
+// Function
+// Name: RateLimitingStream::RateLimitingStream(const char *, int, int)
+// Purpose: Constructor, set up buffer
+// Created: 2011/01/11
+//
+// --------------------------------------------------------------------------
+RateLimitingStream::RateLimitingStream(IOStream& rSink, size_t targetBytesPerSecond)
+: mrSink(rSink), mStartTime(GetCurrentBoxTime()), mTotalBytesRead(0),
+ mTargetBytesPerSecond(targetBytesPerSecond)
+{ }
+
+// --------------------------------------------------------------------------
+//
+// Function
+// Name: RateLimitingStream::Read(void *pBuffer, int NBytes,
+// int Timeout)
+// Purpose: Reads bytes to the underlying stream at no more than
+// a fixed rate
+// Created: 2011/01/11
+//
+// --------------------------------------------------------------------------
+int RateLimitingStream::Read(void *pBuffer, int NBytes, int Timeout)
+{
+ if(NBytes > 0 && (size_t)NBytes > mTargetBytesPerSecond)
+ {
+ // Limit to one second's worth of data for performance
+ BOX_TRACE("Reducing read size from " << NBytes << " to " <<
+ mTargetBytesPerSecond << " to smooth upload rate");
+ NBytes = mTargetBytesPerSecond;
+ }
+
+ int bytesReadThisTime = mrSink.Read(pBuffer, NBytes, Timeout);
+
+ // How many bytes we will have written after this write finishes?
+ mTotalBytesRead += bytesReadThisTime;
+
+ // When should it be completed by?
+ box_time_t desiredFinishTime = mStartTime +
+ SecondsToBoxTime(mTotalBytesRead / mTargetBytesPerSecond);
+
+ // How long do we have to wait?
+ box_time_t currentTime = GetCurrentBoxTime();
+ int64_t waitTime = desiredFinishTime - currentTime;
+
+ // How are we doing so far? (for logging only)
+ box_time_t currentDuration = currentTime - mStartTime;
+ uint64_t effectiveRateSoFar = (mTotalBytesRead * MICRO_SEC_IN_SEC_LL)
+ / currentDuration;
+
+ if(waitTime > 0)
+ {
+ BOX_TRACE("Current rate " << effectiveRateSoFar <<
+ " higher than desired rate " << mTargetBytesPerSecond <<
+ ", sleeping for " << BoxTimeToMilliSeconds(waitTime) <<
+ " ms");
+ ShortSleep(waitTime, false);
+ }
+ else
+ {
+ BOX_TRACE("Current rate " << effectiveRateSoFar <<
+ " lower than desired rate " << mTargetBytesPerSecond <<
+ ", sending immediately (would have sent " <<
+ (BoxTimeToMilliSeconds(-waitTime)) << " ms ago)");
+ }
+
+ return bytesReadThisTime;
+}
+
diff --git a/lib/common/RateLimitingStream.h b/lib/common/RateLimitingStream.h
new file mode 100644
index 00000000..a322b99b
--- /dev/null
+++ b/lib/common/RateLimitingStream.h
@@ -0,0 +1,71 @@
+// --------------------------------------------------------------------------
+//
+// File
+// Name: RateLimitingStream.h
+// Purpose: Rate-limiting write-only wrapper around IOStreams
+// Created: 2011/01/11
+//
+// --------------------------------------------------------------------------
+
+#ifndef RATELIMITINGSTREAM__H
+#define RATELIMITINGSTREAM__H
+
+#include "BoxTime.h"
+#include "IOStream.h"
+
+class RateLimitingStream : public IOStream
+{
+private:
+ IOStream& mrSink;
+ box_time_t mStartTime;
+ uint64_t mTotalBytesRead;
+ size_t mTargetBytesPerSecond;
+
+public:
+ RateLimitingStream(IOStream& rSink, size_t targetBytesPerSecond);
+ virtual ~RateLimitingStream() { }
+
+ // This is the only magic
+ virtual int Read(void *pBuffer, int NBytes,
+ int Timeout = IOStream::TimeOutInfinite);
+
+ // Everything else is delegated to the sink
+ virtual void Write(const void *pBuffer, int NBytes)
+ {
+ Write(pBuffer, NBytes);
+ }
+ virtual pos_type BytesLeftToRead()
+ {
+ return mrSink.BytesLeftToRead();
+ }
+ virtual pos_type GetPosition() const
+ {
+ return mrSink.GetPosition();
+ }
+ virtual void Seek(IOStream::pos_type Offset, int SeekType)
+ {
+ mrSink.Seek(Offset, SeekType);
+ }
+ virtual void Flush(int Timeout = IOStream::TimeOutInfinite)
+ {
+ mrSink.Flush(Timeout);
+ }
+ virtual void Close()
+ {
+ mrSink.Close();
+ }
+ virtual bool StreamDataLeft()
+ {
+ return mrSink.StreamDataLeft();
+ }
+ virtual bool StreamClosed()
+ {
+ return mrSink.StreamClosed();
+ }
+
+private:
+ RateLimitingStream(const RateLimitingStream &rToCopy)
+ : mrSink(rToCopy.mrSink) { /* do not call */ }
+};
+
+#endif // RATELIMITINGSTREAM__H