#!/usr/bin/python2

import sys, socket, errno, os, struct, threading, string, time, mmap, select
import ConfigParser
import blockreader_mmap,blockreader_lseek

defaults = {
    "blockreader":         ("mmap",       str),
    "blocksize":           ("1000",       int),
    "multicastpacketrate": ("128",        int),
    "unicastpacketrate":   ("128",        int),
    "multicastgroup":      ("224.1.2.3",  str),
    "multicastport":       ("1234",       int),
    "multicastttl":        ("1",          int),
    "unicastaddr":         ("",           str),
    "unicastport":         ("4321",       int),
    "retransdelay":        ("3",          int),
    "retransmcast":        ("2",          int)
    }

def sleep(t):
    if (t<0): return
    now = time.time()+t
    while time.time() < now: pass

class stats:
    def __init__(self):
        self.unicasts    = 0   # unicast packets sent
        self.multicasts  = 0   # multicast packets sent
        self.clients     = {}  # retrans requests per (IP,PORT) tuple
        self.refills     = 0   # number of times the flow has started
    def __call__(self, unicast=0, multicast=0, client=None, refill=0):
        self.unicasts   += unicast
        self.multicasts += multicast
        self.refills    += refill
        if client:
            if not self.clients.has_key(client):
                self.clients[client]=0
            self.clients[client] = self.clients[client]+1
    def __repr__(self):
        return "U:%d M:%d R:%d dC:%d tC:%d"%\
               (self.unicasts,self.multicasts,self.refills,len(self.clients),
                reduce(lambda a,b:a+b,self.clients.values(),0))

class requestqueue:
    """
    Queue of packets to be re-sent. Packets can be added using
    push(), specifying the packet number and the client requesting the
    packet. If the packet is not queued yet, its queuing time is
    recorded. In all cases, the client is added to the list of clients
    having requested the packet.
    When calling pop(), the earliest packet whose deadline is anterior
    to specified time is dequeued and returned as a tuple :
    (packetnumber, list of clients).
    If there is no packet (empty queue, or no deadline reached yet),
    pop() returns None.
    """
    class request:
        def __init__(self,number):
            self.number  = number
            self.when    = time.time()
            self.clients = []
        def __repr__(self):
            return "(%d@%f:%s)"%(self.number,self.when,str(self.clients))
    def __init__(self):
        self.queue = []
        self.index = {}
        self.lock = threading.Lock()
    def push(self,blocknumber,client):
        self.lock.acquire()
        if not self.index.has_key(blocknumber):
            self.index[blocknumber] = requestqueue.request(blocknumber)
            self.queue += [self.index[blocknumber]]
        if client not in self.index[blocknumber].clients:
            self.index[blocknumber].clients += [ client ]
        self.lock.release()
    def pop(self,when):
        if len(self.queue)==0: return None
        if self.queue[0].when > when: return None
        else:
            self.lock.acquire()
            p = self.queue[0]
            self.queue = self.queue[1:]
            del self.index[p.number]
            self.lock.release()
            return (p.number, p.clients)

class receiver(threading.Thread):
    """
    Thread handling all incoming packets. It will receive retrans requests,
    and add them to the requestqueue. These requests will be handled later
    by the "sender" thread. The receiver thread also handles "info requests"
    (magic packet number 0xFFFFFFFF).
    """
    def __init__(self,filer):
        threading.Thread.__init__(self)
        self.filer = filer
    def run(self):
        self.running = Ellipsis
        while self.running:
            (a,b,c)=select.select ([self.filer.netsocket], [], [], 0)
            if a==[]:
                time.sleep(0.1)
                continue
            (data,(addr,port)) = self.filer.netsocket.recvfrom(1000)
            self.filer.stats(client=addr)
            data = string.split(data,":")
            if data[0]=="FFFFFFFF":
                #self.filer.announce(addr,port)
                self.filer.announce()
                if self.filer.nextmulticastblock == -1:
                    self.filer.nextmulticastblock = 0
                    self.filer.stats(refill=1)
            else:
                offset = string.atoi ("0x"+data[0],16)
                self.filer.requestqueue.push(offset,(addr,port))
        print "Receiver thread shutting down."

class sender(threading.Thread):
    """
    Thread handling main multicast flow and retransmissions. As this thread
    controls all outgoing traffic, it is able to limit unicast+multicast
    bandwidth if necessary.
    """
    def __init__(self,filer):
        self.filer = filer
        threading.Thread.__init__(self)
    def run(self):
        self.running = Ellipsis
        #lastannounce = time.time()
        lastannounce = 2000000000
        while self.running:
            if time.time()-lastannounce>1:
                self.filer.announce()
                lastannounce = time.time()
            while 1:
                pop = self.filer.requestqueue.pop(time.time()-
                                                  self.filer.retransdelay)
                if pop==None: break
                (blocknumber,clients) = pop
                if len(clients) < self.filer.retransmcast:
                    for (addr,port) in clients:
                        self.filer.sendblock(blocknumber,addr,port)
                else:
                    self.filer.sendblock(blocknumber)
            if self.filer.nextmulticastblock != -1:
                self.filer.sendblock(self.filer.nextmulticastblock)
                self.filer.nextmulticastblock += 1
                if self.filer.nextmulticastblock == \
                   self.filer.blockreader.blockcount:
                    self.filer.nextmulticastblock = -1
            else:
                time.sleep(0.01) # yield scheduler
            sleep (self.filer.interval)
        print "Sender thread shutting down."

class filer:
    def __init__(self,blockreader,blocksize,
                 unicastaddr,unicastport,
                 multicastgroup,multicastport,multicastttl,
                 multicastpacketrate,unicastpacketrate,
                 retransdelay,retransmcast,
                 filename):
        self.blockreader         = blockreader
        self.unicastaddr         = unicastaddr
        self.unicastport         = unicastport
        self.multicastgroup      = multicastgroup
        self.multicastport       = multicastport
        self.multicastttl        = multicastttl
        self.multicastpacketrate = multicastpacketrate
        self.unicastpacketrate   = unicastpacketrate
        self.retransdelay        = retransdelay
        self.retransmcast        = retransmcast
        self.netsocket   = socket.socket (socket.AF_INET, socket.SOCK_DGRAM)
        self.netsocket.bind ((unicastaddr, unicastport))
        #ttl = struct.pack('b',1)
        #netsocket.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)

        self.interval = 1.0/multicastpacketrate

        self.sender      = sender (self)
        self.receiver    = receiver (self)
        self.stats       = stats()
        self.nextmulticastblock = -1
        self.requestqueue = requestqueue()

    def announce(self,addr=None,port=None):
        if addr==None: addr=self.multicastgroup
        if port==None: port=self.multicastport
        self.netsocket.sendto (
            ("FFFFFFFF:filename=%s:blocksize=%d:wholeblocks=%d:lastblock=%d"%\
             (self.blockreader.filename,self.blockreader.blocksize,
              self.blockreader.wholeblocks,self.blockreader.lastblock)),
            (addr,port))

    def sendblock(self,blocknumber,addr=None,port=None):
        if addr==None:
            addr=self.multicastgroup
            port=self.multicastport
            self.stats(multicast=1)
        else: self.stats(unicast=1)
        blockdata = self.blockreader.getblock(blocknumber)
        self.netsocket.sendto("%08X:%s"%(blocknumber,blockdata),(addr,port))

    def start(self):
        self.sender.start ()
        self.receiver.start ()

    def stop(self):
        self.sender.running = None
        self.receiver.running = None

class wrapper(filer):
    def __init__(self,configfile="deplika.conf",**extraopts):
        global defaults
        cp = ConfigParser.ConfigParser ()
        cp.read(configfile)
        options = {}
        for o in defaults.keys(): options[o]=defaults[o][0]
        for o in cp.options('bmcast'): options[o]=cp.get('bmcast',o)
        for o in extraopts.keys(): options[o]=extraopts[o]
        for o in defaults.keys(): options[o]=defaults[o][1](options[o])
        options['blockreader'] = globals()\
                                 ['blockreader_%s'%options['blockreader']].\
                                 __dict__\
                                 ['blockreader_%s'%options['blockreader']]\
                                 (options['filename'],options['blocksize'])
        filer.__init__(self,**options)
        
if sys.argv[0]!='':
    try:
        f = wrapper(filename=sys.argv[1])
        f.start()
        while 1:
            time.sleep(1)
            print f.stats
    except IndexError: print "Specify filename"
    except KeyboardInterrupt: f.stop()

