by Andreas Aardal Hanssen |
TCP traffic control (also known as "rate control" or "traffic throttling") is about managing the network bandwidth available to your application. This means limiting the number of concurrent connections and the upload and download rates for each connection.
In this article, we will show how you can implement your own rate-controlled socket by subclassing QTcpSocket. Our implementation is a simplified version of the traffic controller from Qt 4.1's Torrent example (located in examples/network/torrent).
Who Needs Traffic Control Anyway? |
Three Levels of Traffic Control
There are several places that traffic control can be applied:
Many of the applications we use every day implement some sort of TCP traffic control: games use traffic control for downloading patches, operating systems use it for downloading security updates, and in web browsers with transfer dialogs, traffic control is absolutely necessary to avoid network congestion.
For applications that download large datasets, limiting a connection's download rate is also very important. Without traffic control, your application might end up stealing most of the bandwidth, slowing down everyone else using the same network link. While fetching a huge email attachment, you could annoy coworkers waiting for an important web transaction to complete.
With traffic control, if you download a 4 GB DVD ISO image file during office hours, you can specify a maximum download rate of 20 KB/s. Your image file will arrive at a steady pace while the rest of the bandwidth remains available to everyone else.
There are also benefits to limiting your upload rate. Upload limiting is crucial for connections with a low general upload bandwidth (ADSL connections often have a 1:10 ratio between upload and download rates). While uploading at full speed, you will often see that a concurrent download rate will drop significantly. In that case, limiting your upload rate from 25 KB/s to 15 KB/s will usually allow downloading at full speed again. This happens because a congested TCP upload pipe prevents your host from sending TCP ACK packets for the download.
But that's enough chit-chat. Let's write some code! Our implementation consists of two classes: RcTcpSocket inherits QTcpSocket, and RateController is a lean QObject subclass. One RateController controls the upload and download rates for a group of RcTcpSocket connections. Let's look at RateController first.
Implementing the Rate Controller |
What's important in a traffic controller?
class RateController : public QObject { Q_OBJECT public: RateController(QObject *parent = 0) : QObject(parent), transferScheduled(false) { } void addSocket(RcTcpSocket *socket); void removeSocket(RcTcpSocket *socket); int uploadLimit() const { return upLimit; } int downloadLimit() const { return downLimit; } void setUploadLimit(int bytesPerSecond) { upLimit = bytesPerSecond; } void setDownloadLimit(int bytesPerSecond); public slots: void transfer(); void scheduleTransfer(); private: QTimer stopWatch; QSet<RcTcpSocket *> sockets; int upLimit; int downLimit; bool transferScheduled; };
The RateController class contains a list of RcTcpSockets. Once you add a socket by calling addSocket(), RateController will start managing this socket's transfer rate by calling the transfer() slot repeatedly.
All sockets managed by the same RateController will have an accumulated maximum upload and download rate that you can set by calling setUploadLimit() and setDownloadLimit(). These functions can be called at any time to influence RateController's behavior.
void RateController::addSocket(RcTcpSocket *socket) { connect(socket, SIGNAL(readyToTransfer()), this, SLOT(transfer())); socket->setReadBufferSize(downLimit * 2); sockets.insert(socket); scheduleTransfer(); }
In addSocket(), we connect the RcTcpSocket::readyToTransfer() signal to our RateController::transfer() slot. RcTcpSocket uses this signal to notify the rate controller when it has data ready to transfer.
Next, we limit the socket's read buffer size. This is necessary to prevent QTcpSocket from downloading at maximum capacity; once the read buffer is full, QTcpSocket will pause downloading. By setting this buffer to be twice as large as the allowed download limit, we will help keep the download rate steady.
Finally, we add the socket to an internal QSet and call scheduleTransfer(); we'll get back to this later.
void RateController::removeSocket(RcTcpSocket *socket) { disconnect(socket, SIGNAL(readyToTransfer()), this, SLOT(transfer())); socket->setReadBufferSize(0); sockets.remove(socket); }
The removeSocket() function simply undoes what we did in addSocket(). The special zero value passed to setReadBufferSize() removes any limit on the read buffer size.
void RateController::setDownloadLimit(int bytesPerSecond) { downLimit = bytesPerSecond; foreach (RcTcpSocket *socket, sockets) socket->setReadBufferSize(downLimit * 2); }
In setDownloadLimit(), we update all registered sockets' read buffer sizes to allow real-time adjustments to the download limit.
void RateController::scheduleTransfer() { if (transferScheduled) return; transferScheduled = true; QTimer::singleShot(50, this, SLOT(transfer())); }
The scheduleTransfer() slot ensures that transfer() is called at most once every 50 milliseconds, simplifying our code significantly – if every signal that notifies data transfer activity is connected to this slot, the transfer() slot will eventually be called. For network protocols that require very low round-trip times, a shorter interval can be used, at the expense of some CPU.
Let's move on to the transfer() slot, which is where all the traffic throttling goes on:
void RateController::transfer() { transferScheduled = false; int msecs = 1000; if (!stopWatch.isNull()) msecs = qMin(msecs, stopWatch.elapsed());
We start by resetting transferScheduled to false, allowing this slot to be called again (if transferScheduled is true, scheduleTransfer() returns immediately). Then we check how long it is since the last time this slot was called using the class's internal stopWatch timer.
The first time transfer() is called, the timer will be null, so we will default to 1000 milliseconds.
qint64 bytesToWrite = (upLimit * msecs) / 1000; qint64 bytesToRead = (downLimit * msecs) / 1000; if (bytesToWrite == 0 && bytesToRead == 0) { scheduleTransfer(); return; }
Each time transfer() is called, we take the full upload and download limits of the rate controller, reducing them to shares of the full 1 second transfer window, to find out how much data we can read and write. If the slot is called too soon, and there is no data to read or write, we schedule another transfer() call and return.
QSet<RcTcpSocket *> pendingSockets; foreach (RcTcpSocket *client, sockets) { if (client->canTransferMore()) pendingSockets.insert(client); } if (pendingSockets.isEmpty()) return; stopWatch.start();
Now, we put together a list of all monitored sockets that are ready to transfer data. If this list is empty (i.e., no sockets can transfer anything), we return. Otherwise, we start or restart the stopWatch timer because we are about to start transferring data.
bool canTransferMore; do { canTransferMore = false; qint64 writeChunk = qMax(qint64(1), bytesToWrite / pendingSockets.size()); qint64 readChunk = qMax(qint64(1), bytesToRead / pendingSockets.size()); QSetIterator<RcTcpSocket *> it(pendingSockets); while (it.hasNext() && (bytesToWrite > 0 || bytesToRead > 0)) { RcTcpSocket *socket = it.next();
Our data transfer step consists of an outer loop that runs as long as there is still data left to transfer on any socket, and an inner loop that runs through all sockets that can transfer data. Before we enter the inner loop, we take the total bytes to read and write, divide them by the number of pending sockets, and the result is the maximum chunk of data each socket can write this time.
bool dataTransferred = false; qint64 available = qMin(readChunk, socket->networkBytesAvailable()); if (available > 0) { qint64 readBytes = socket->readFromNetwork( qMin(available, bytesToRead)); if (readBytes > 0) { bytesToRead -= readBytes; dataTransferred = true; } }
We will handle reading first. The socket is asked to read the minimum of readChunk, bytesToRead, and the number of bytes that are available for reading from the socket. If the socket was able to read anything, we update bytesToRead and set dataTransferred to notify that the socket was able to transfer data.
if (upLimit * 2 > socket->bytesToWrite()) { qint64 chunkSize = qMin(writeChunk, bytesToWrite); qint64 toWrite = qMin(chkSize, upLimit * 2 - socket->bytesToWrite()); if (toWrite > 0) { qint64 writtenBytes = socket->writeToNetwork(toWrite); if (writtenBytes > 0) { bytesToWrite -= writtenBytes; dataTransferred = true; } } }
We control writing with the same pattern, but in addition we make sure that the socket doesn't have more than twice its upload limit pending already. This is to prevent the socket's outgoing buffer from growing too much when the connection is congested. Again, if we transferred some data, we set dataTransferred.
if (dataTransferred && socket->canTransferMore()) canTransferMore = true; else pendingSockets.remove(socket); } } while (canTransferMore && (bytesToWrite > 0 || bytesToRead > 0) && !pendingSockets.isEmpty()); if (canTransferMore) scheduleTransfer(); }
Finally, if the socket was unable to transfer any data, it is removed from the list of pending sockets. Otherwise, if more data can be transferred by any socket, the outer loop will restart and continue until there is nothing more to transfer. At the end of transfer(), we schedule another call if any data can be transferred.
Implementing the Rate-Controlled Socket |
RcTcpSocket is a subclass of QTcpSocket that stores incoming and outgoing data in two buffers, instead of operating directly on the network. Outgoing data is first stored in a buffer, until RateController calls writeToNetwork() with an appropriate number of bytes. Similarly, incoming data is available to the user of RcTcpSocket only after readFromNetwork() is called.
class RcTcpSocket : public QTcpSocket { Q_OBJECT public: RcTcpSocket(QObject *parent = 0); bool canReadLine() const { return incoming.contains('\n'); } qint64 writeToNetwork(qint64 maxLen); qint64 readFromNetwork(qint64 maxLen); bool canTransferMore() const; qint64 bytesAvailable() const; qint64 networkBytesAvailable() const { return QTcpSocket::bytesAvailable(); } signals: void readyToTransfer(); protected: qint64 readData(char *data, qint64 maxLen); qint64 readLineData(char *data, qint64 maxLen); qint64 writeData(const char *data, qint64 len); private: QByteArray outgoing; QByteArray incoming; };
Here's the constructor:
RcTcpSocket::RcTcpSocket(QObject *parent) : QTcpSocket(parent) { connect(this, SIGNAL(readyRead()), this, SIGNAL(readyToTransfer())); connect(this, SIGNAL(connected()), this, SIGNAL(readyToTransfer())); }
We connect the connected() and readyRead() signals to readyToTransfer(). This allows us to notify RateController when we are ready to read from or write to the network.
qint64 RcTcpSocket::writeToNetwork(qint64 maxLen) { qint64 bytesWritten = QTcpSocket::writeData(outgoing.data(), qMin(maxLen, qint64(outgoing.size()))); if (bytesWritten <= 0) return bytesWritten; outgoing.remove(0, bytesWritten); return bytesWritten; }
The writeToNetwork() function writes as much as it can, up to a maximum of maxLen bytes, from its outgoing buffer and onto the network.
qint64 RcTcpSocket::readFromNetwork(qint64 maxLen) { int oldSize = incoming.size(); incoming.resize(incoming.size() + maxLen); qint64 bytesRead = QTcpSocket::readData( incoming.data() + oldSize, maxLen); incoming.resize(bytesRead <= 0 ? oldSize : oldSize + bytesRead); if (bytesRead > 0) emit readyRead(); return bytesRead; }
The readFromNetwork() function works similarily to writeToNetwork(). We use QTcpSocket::readData() to read as much as we can, up to a limit of maxLen bytes, from the network. Then we store the result in the socket's incoming buffer. We emit readyRead() to notify that bytes have arrived.
bool RcTcpSocket::canTransferMore() const { return !incoming.isEmpty() || !outgoing.isEmpty() || QTcpSocket::bytesAvailable() > 0; }
The canTransferMore() function returns true if either of the buffers contain data or if there's data available on the socket.
qint64 RcTcpSocket::bytesAvailable() const { if (state() != ConnectedState) { QByteArray buffer; buffer.resize(QTcpSocket::bytesAvailable()); RcTcpSocket *that = const_cast<RcTcpSocket *>(this); that->QTcpSocket::readData(buffer.data(), buffer.size()); that->incoming += buffer; } return incoming.size(); }
The bytesAvailable() function overrides the QTcpSocket version of the function. If called in ConnectedState, it will simply return the number of bytes in the incoming buffer. Otherwise, it will cast away the const in the this pointer and read all pending data from QTcpSocket's buffer into its own buffer and then return the size. We'll get back to this in the next paragraph.
qint64 RcTcpSocket::readData(char *data, qint64 maxLen) { int bytesRead = qMin<int>(maxLen, incoming.size()); memcpy(data, incoming.constData(), bytesRead); incoming.remove(0, bytesRead); if (state() != ConnectedState) { QByteArray buffer; buffer.resize(QTcpSocket::bytesAvailable()); QTcpSocket::readData(buffer.data(), buffer.size()); incoming += buffer; } return qint64(bytesRead); }
By default, in our readData() implementation, we simply pop data off our incoming buffer. However, if the connection is not open, we apply the same block of code as we used in bytesAvailable(). When QTcpSocket emits disconnected() or error(RemoteHostClosed), you can normally assume that a single readAll() will give you all the pending data. However, the rate controller is not likely to have transferred all data before the socket connection closed. We have solved this problem by reading all pending data into the incoming buffer if the connection is no longer open.
qint64 RcTcpSocket::readLineData(char *data, qint64 maxLen) { return QIODevice::readLineData(data, maxLen); }
We must remember to reimplement readLineData() as well; otherwise, QTcpSocket's own implementation will be called, but that function operates directly on the network, bypassing our buffer. Our implementation calls QIODevice::readLineData(), a default implementation of a "read line" algorithm that in turn calls our readData() implementation. As you can see from the class definition, we have also reimplemented RcTcpSocket::canReadLine(), which simply searches for a '"\\"n' in the incoming buffer.
qint64 RcTcpSocket::writeData(const char *data, qint64 len) { int oldSize = outgoing.size(); outgoing.resize(oldSize + len); memcpy(outgoing.data() + oldSize, data, len); emit readyToTransfer(); return len; }
Our writeData() reimplementation appends the data to the outgoing buffer, and emits readyToTransfer() so that RateController can make sure the data is transferred to the network.
And that completes this example. We'll finish off with main(), to demonstrate how our new rate controlled socket is used to fetch Trolltech's home page with a maximum download rate of 2 KB/s:
int main(int argc, char **argv) { QCoreApplication app(argc, argv); RcTcpSocket socket; socket.connectToHost("www.trolltech.com", 80); socket.write("GET / HTTP/1.0\r\n\r\n"); RateController controller; controller.setUploadLimit(512); controller.setDownloadLimit(2048); controller.addSocket(&socket); return app.exec(); }
Copyright © 2006 Trolltech | Trademarks |