Friday, January 15, 2010

Updated: Token Bucket Downloader

I've updated the rate limiting downloader (that uses token bucket algorithm) to be more user friendly out of the box. It attempts to use urllib2 by default so it can rate limit pretty much any url.

Previously the script required an http proxy and the only means of adjusting operating parameters was global variables in the script. A proxy server is no longer required and all operating parameters can be adjusted via script options.

If an http proxy is selected the python http library (httplib) is used, this is a more rudimentary library so not as many situations are handled. It's possible to install a proxy handler in urllib2 but I didn't do this.

This is more-or-less a "complete" rate limiting download manager. Option output:

   Usage: rlfetch.py [options] url

   Options:
     -h, --help            show this help message and exit
     -f FILE, --file=FILE  output filename
     -d DIR, --dest=DIR    destination directory
     -p SERVER:PORT, --proxy=SERVER:PORT
                           http proxy server
     -z BYTES, --buffer=BYTES
                           buffer size
     -l KBS, --limit=KBS   kbs limit
     -b KBS, --burst=KBS   burst limit
     -t SECONDS, --tick=SECONDS
                           tick interval

Current state of the code.

Monday, January 11, 2010

Basic Token Bucket Rate Limiter

In locations were you have limited internet resources it's sometimes necessary to implement rate limiting. I was curious exactly how this worked so I worked out a simple Token Bucket based rate limiting HTTP downloader.

This Python script does a couple things:

  • Limits rate of data consumption in kilobytes per second
  • Prints out the instantaneous KB/s and the overal/actual KB/s [this is done by monitoring the file size on disk]

Token bucket is a pretty simple algorithm. The basic algorithm is to create an artificial stream of tokens, which are generated as fast you want to allow the real stream to go. If tokens are not removed from the "bucket" then tokens are only generated up to a "burst limit", which is the maximum amount over the average limit that's desirable (this could change to help trend a stream toward the average limit).

In the Python implementation, 3 threads are used. Thread one monitors the rate of download. Thread two consumes tokens and downloads real data from an HTTP source. Thread three generates tokens and places them in a bucket, stopping when the burst limit has been reached.

The code is available below or at codepad.org.

# -*- python -*-

import os
import random
import time
import httplib
import urlparse
import sys

import threading

from pprint import pprint

###########################
# Tuning knobs
BUFSIZE = 8192
BPS_LIMIT = 50 * 1024
BURST_BPS_LIMIT = 70 * 1024
TICK = 0.01
###########################

g_usage = "%s <download directory> <target url>"

g_bucketTokens = 0
g_exit = False

def parseArgv(argv):
    if len(argv) < 3:
        return None
    directory = argv[1]
    if not os.path.isdir(directory):
        return None
    url = argv[2]
    s = urlparse.urlsplit(url)
    _, filename = os.path.split(s.path)
    return url, os.path.join(directory, filename)
     
def takeTokens(tokencount):
    global g_bucketTokens
    if g_bucketTokens >= tokencount:
        g_bucketTokens -= tokencount
        return True
    return False

def printKbs(filename):
    def _():
        start, end = None, None
        tot, num = 0, 0
        while True:
            if g_exit:
                print "KB/s monitor exiting..."
                break
            end = time.time()
            size = g_byteCount
            if start is not None:
                inst = ((size - old) / 1024.0) / (end - start)
                tot += inst
                num += 1
                print "I: %.02f kb/s, A: %.02f kb/s" % (inst, tot / num)
            start = time.time()
            old = size
            for x in xrange(int(2 / TICK)):
                if g_exit: break 
                time.sleep(TICK)
    return _

def feedBucketTokens():
    global g_bucketTokens
    tokens_per = int(BPS_LIMIT * TICK)
    print "Tokens per tick: %d" % (tokens_per,)
    while True:
        if g_exit:
            print "Token feeder exiting..."
            break
        if g_bucketTokens >= BURST_BPS_LIMIT:
            time.sleep(TICK)
            continue
        g_bucketTokens += tokens_per
        time.sleep(TICK)

class BucketReader (object):
    def __init__(self, fp):
        self.fp = fp
    def read(self, bufsize):
        while True:
            if takeTokens(bufsize):
                break
            time.sleep(TICK)
        return self.fp.read(bufsize)
        
def prepareFile(filename):
    fp = open(filename, 'ab+')
    fp.seek(0, 2)
    fsize = fp.tell()
    return fp, fsize

def startHttpReq(url, fsize):
    headers = { "Range" : ("bytes=%d-" % fsize), }
    pprint(headers)
    h = httplib.HTTPConnection("aproxyserver", 8080)
    h.request("GET", url, headers=headers)
    r = h.getresponse()
    pprint(r.getheaders())
    return r

g_byteCount = 0

def readLoop(fpIn, fpOut):
    global g_byteCount
    while True:
        d = fpIn.read(BUFSIZE)
        fpOut.write(d)
        fpOut.flush()
        g_byteCount += len(d)
        if len(d) < BUFSIZE:
            break

def main():
    global g_exit
    try:
        params = parseArgv(sys.argv)
        if params is None:
            print g_usage % (sys.argv[0],)
            raise SystemExit(1)
        url, filename = params
        threading.Thread(target=feedBucketTokens).start()
        threading.Thread(target=printKbs(filename)).start()
        fpOut, fsize = prepareFile(filename) 
        fpInput = startHttpReq(url, fsize)
        readLoop(BucketReader(fpInput), fpOut)
    except KeyboardInterrupt:
        pass
    finally:
        g_exit = True

if __name__ == '__main__':
    main()

# vim: et:sts=4:ts=4:sw=4: