In locations were you have limited internet resources it's sometimes necessary to implement rate limiting. I was curious exactly how this worked so I worked out a simple Token Bucket based rate limiting HTTP downloader.
This Python script does a couple things:
- Limits rate of data consumption in kilobytes per second
- Prints out the instantaneous KB/s and the overal/actual KB/s [this is done by monitoring the file size on disk]
Token bucket is a pretty simple algorithm. The basic algorithm is to create an artificial stream of tokens, which are generated as fast you want to allow the real stream to go. If tokens are not removed from the "bucket" then tokens are only generated up to a "burst limit", which is the maximum amount over the average limit that's desirable (this could change to help trend a stream toward the average limit).
In the Python implementation, 3 threads are used. Thread one monitors the rate of download. Thread two consumes tokens and downloads real data from an HTTP source. Thread three generates tokens and places them in a bucket, stopping when the burst limit has been reached.
The code is available below or at codepad.org.
# -*- python -*- import os import random import time import httplib import urlparse import sys import threading from pprint import pprint ########################### # Tuning knobs BUFSIZE = 8192 BPS_LIMIT = 50 * 1024 BURST_BPS_LIMIT = 70 * 1024 TICK = 0.01 ########################### g_usage = "%s <download directory> <target url>" g_bucketTokens = 0 g_exit = False def parseArgv(argv): if len(argv) < 3: return None directory = argv[1] if not os.path.isdir(directory): return None url = argv[2] s = urlparse.urlsplit(url) _, filename = os.path.split(s.path) return url, os.path.join(directory, filename) def takeTokens(tokencount): global g_bucketTokens if g_bucketTokens >= tokencount: g_bucketTokens -= tokencount return True return False def printKbs(filename): def _(): start, end = None, None tot, num = 0, 0 while True: if g_exit: print "KB/s monitor exiting..." break end = time.time() size = g_byteCount if start is not None: inst = ((size - old) / 1024.0) / (end - start) tot += inst num += 1 print "I: %.02f kb/s, A: %.02f kb/s" % (inst, tot / num) start = time.time() old = size for x in xrange(int(2 / TICK)): if g_exit: break time.sleep(TICK) return _ def feedBucketTokens(): global g_bucketTokens tokens_per = int(BPS_LIMIT * TICK) print "Tokens per tick: %d" % (tokens_per,) while True: if g_exit: print "Token feeder exiting..." break if g_bucketTokens >= BURST_BPS_LIMIT: time.sleep(TICK) continue g_bucketTokens += tokens_per time.sleep(TICK) class BucketReader (object): def __init__(self, fp): self.fp = fp def read(self, bufsize): while True: if takeTokens(bufsize): break time.sleep(TICK) return self.fp.read(bufsize) def prepareFile(filename): fp = open(filename, 'ab+') fp.seek(0, 2) fsize = fp.tell() return fp, fsize def startHttpReq(url, fsize): headers = { "Range" : ("bytes=%d-" % fsize), } pprint(headers) h = httplib.HTTPConnection("aproxyserver", 8080) h.request("GET", url, headers=headers) r = h.getresponse() pprint(r.getheaders()) return r g_byteCount = 0 def readLoop(fpIn, fpOut): global g_byteCount while True: d = fpIn.read(BUFSIZE) fpOut.write(d) fpOut.flush() g_byteCount += len(d) if len(d) < BUFSIZE: break def main(): global g_exit try: params = parseArgv(sys.argv) if params is None: print g_usage % (sys.argv[0],) raise SystemExit(1) url, filename = params threading.Thread(target=feedBucketTokens).start() threading.Thread(target=printKbs(filename)).start() fpOut, fsize = prepareFile(filename) fpInput = startHttpReq(url, fsize) readLoop(BucketReader(fpInput), fpOut) except KeyboardInterrupt: pass finally: g_exit = True if __name__ == '__main__': main() # vim: et:sts=4:ts=4:sw=4:
No comments:
Post a Comment