BlinkenArea - GitList
Repositories
Blog
Wiki
Blinker
Code
Commits
Branches
Tags
Search
Tree:
164918a
Branches
Tags
master
Blinker
phonesim
phonesim.py
avoid deprecated GObject functions
Stefan Schuermans
commited
164918a
at 2019-08-14 19:15:14
phonesim.py
Blame
History
Raw
#! /usr/bin/env python3 """phonesim - phone simulator for EBIP protocol Copyright 2019 Stefan Schuermans <stefan@schuermans.info> Copyleft: CC-BY-SA http://creativecommons.org/licenses/by-sa/3.0/""" import argparse import gi import os import socket import string import sys import time gi.require_version('Gtk', '3.0') from gi.repository import GLib from gi.repository import Gtk from gi.repository import Pango scriptdir = os.path.dirname(os.path.abspath(__file__)) class Master(object): """master window of phone simulator""" class Client(object): """EBIP client""" def __init__(self, master, addr): """initialize new client""" super().__init__() self.master = master self.addr = addr # initialize state self.last_heartbeat_time = None self.last_register_time = None def heartbeat(self): """client sent heartbeat""" now = time.time() self.last_heartbeat_time = now def register(self): """client registered""" now = time.time() self.last_register_time = now self.last_heartbeat_time = now self.master.output("client \"{:s}:{:d}\" registered".format( self.addr[0], self.addr[1])) def tick100ms(self, now): """periodic 100ms tick""" # expire client if no heartbeat is received if now >= self.last_heartbeat_time + 60: self.master.output("client \"{:s}:{:d}\" expired".format( self.addr[0], self.addr[1])) # expire client return False # keep client return True def __init__(self, bind, verbose): """construct the master window""" super().__init__() self.bind = bind self.verbose = verbose # state information now = time.time() self.clients = {} # dict: addr -> Client self.calls = {} # dict: line number -> Call self.max_line_no = 2 self.last_line_state = now # build window self.builder = Gtk.Builder() self.builder.add_from_file(scriptdir + "/master.glade") self.widPhoneNoEntry = self.builder.get_object("PhoneNoEntry") self.widClientsTextView = self.builder.get_object("ClientsTextView") handlers = { "onCall": self.onCall, "onDestroy": self.onDestroy, } self.builder.connect_signals(handlers) # set up socket self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind(self.bind) self.sockInEvId = GLib.io_add_watch(self.sock.makefile(), GLib.IO_IN, self.onRecv) # start background actions GLib.timeout_add(100, self.onTimer100ms) self.output( "started (bind=\"{:s}:{:d}\")".format(self.bind[0], self.bind[1])) def __del__(self): """cleanup""" GLib.source_remove(self.sockInEvId) self.sock.close() def onCall(self, widget): """call button clicked""" phone_no_str = self.widPhoneNoEntry.get_text() if len(phone_no_str) < 1: self.output("not calling, no number") return if len(self.clients) < 1: self.output("not calling, no clients") return self.output("calling {:s}".format(phone_no_str)) # find free line number line_no = 1 while line_no in self.calls: line_no += 1 self.max_line_no = max(self.max_line_no, line_no) self.calls[line_no] = Call(self, line_no, phone_no_str) def onDestroy(self, widget): """window will be destroyed""" destroy_calls = list(self.calls.values()) for call in destroy_calls: call.destroy() Gtk.main_quit() def onRecv(self, source, condition): """input received on socket""" # read message data, addr = self.sock.recvfrom(4096) msg = data.decode("utf-8") self.output("received \"{:s}\" from \"{:s}:{:d}\"".format(msg, addr[0], addr[1])) # process message self.procMsg(msg, addr) # request to be called on next event return True def onTimer100ms(self): """timer callback, every 100ms""" now = time.time() # client tick, expire clients expire = [] for addr, client in self.clients.items(): if not client.tick100ms(now): expire.append(addr) if len(expire) > 0: for addr in expire: del self.clients[addr] self.updateClients() # line state if now >= self.last_line_state + 1: for line_no in range(1, self.max_line_no + 1): if line_no in self.calls: line_state = self.calls[line_no].get_line_state() else: line_state = "onhook" self.sendMsgToAll("{:d}:{:s}".format(line_no, line_state)) self.last_line_state = now # request being called again return True def output(self, line): """output a line of text if in verbose mode""" if self.verbose: print( "{:s}: {:s}".format(time.strftime("%Y-%m-%d %H:%M:%S"), line)) def procMsg(self, msg, addr): """process message""" fields = msg.strip().split(":") if len(fields) < 2: return try: line_no = int(fields[0]) except ValueError: return if line_no < 0: return msgType = fields[1] prms = fields[2:] # general messages if line_no == 0: # new client messages if msgType == "register" and len(prms) >= 1: self.procMsgRegister(prms[0], addr) return # require client to be registered if addr not in self.clients: return client = self.clients[addr] # regular client messages if msgType == "heartbeat": client.heartbeat() return return # require client to be registered if addr not in self.clients: return client = self.clients[addr] # require line to be open if line_no not in self.calls: return call = self.calls[line_no] # line messages if msgType == "accept": call.accept() return if msgType == "hangup": call.hangup() return if (msgType == "play" or msgType == "playbackground") and len(prms) >= 1: call.play(prms[0]) return def procMsgRegister(self, port_str, addr): """process register message""" try: port = int(port_str) except ValueError: return if port != addr[1]: self.output("client does not know its port (real {:d} msg {:d})". format(addr[1], port)) return if addr not in self.clients: self.clients[addr] = self.Client(self, addr) self.updateClients() self.clients[addr].register() def sendMsg(self, msg, addr): """send message""" self.output( "sending \"{:s}\" to \"{:s}:{:d}\"".format(msg, addr[0], addr[1])) self.sock.sendto(msg.encode("utf-8"), addr) def sendMsgToAll(self, msg): """send message to all clients""" for addr in self.clients: self.sendMsg(msg, addr) def updateClients(self): """update list of clients""" txts = [] for addr in sorted(self.clients): txts.append("{:s}:{:d}".format(addr[0], addr[1])) txt = "\n".join(txts) self.widClientsTextView.get_buffer().set_text(txt) # end of class Master class Call(object): """call windows of phone simulator""" def __init__(self, master, line_no, phone_no_str): """construct the call window""" super().__init__() self.master = master self.line_no = line_no self.phone_no_str = phone_no_str # initialize state self.connected = False # build window self.builder = Gtk.Builder() self.builder.add_from_file(scriptdir + "/call.glade") self.widCallWindow = self.builder.get_object("CallWindow") self.widPhoneNoEntry = self.builder.get_object("PhoneNoEntry") self.widButtons = { "0": self.builder.get_object("Button0"), "1": self.builder.get_object("Button1"), "2": self.builder.get_object("Button2"), "3": self.builder.get_object("Button3"), "4": self.builder.get_object("Button4"), "5": self.builder.get_object("Button5"), "6": self.builder.get_object("Button6"), "7": self.builder.get_object("Button7"), "8": self.builder.get_object("Button8"), "9": self.builder.get_object("Button9"), "*": self.builder.get_object("ButtonStar"), "#": self.builder.get_object("ButtonHash"), } self.widSoundEntry = self.builder.get_object("SoundEntry") handlers = { "onDestroy": self.onDestroy, "onKey": self.onKey, } self.builder.connect_signals(handlers) self.widPhoneNoEntry.set_text(self.phone_no_str) # send connection request message self.master.sendMsgToAll("{:d}:setup:{:d}:{:s}".format( self.line_no, self.line_no, self.phone_no_str)) def accept(self): """call has been accepted""" self.master.output("call to \"{:s}\" accepted on line {:d}".format( self.phone_no_str, self.line_no)) self.master.sendMsgToAll("{:d}:connected".format(self.line_no)) self.enableKeys(True) def destroy(self): """destroy window""" self.widCallWindow.destroy() def get_line_state(self): return "offhook" def hangup(self): """call has been ended""" self.master.output("call to \"{:s}\" hangup on line {:d}".format( self.phone_no_str, self.line_no)) self.destroy() def enableKeys(self, enabled): """enable of disable phone keys""" for key in self.widButtons.values(): key.set_sensitive(enabled) def onDestroy(self, widget): """window will be destroyed""" self.master.output("call to \"{:s}\" ended on line {:d}".format( self.phone_no_str, self.line_no)) self.master.sendMsgToAll("{:d}:onhook".format(self.line_no)) del self.master.calls[self.line_no] def onKey(self, widget): """phone key button clicked""" dtmf = None for d, button in self.widButtons.items(): if widget is button: dtmf = d break else: return # button not found self.master.output("phone key \"{:s}\" pressed on line {:d}".format( dtmf, self.line_no)) # send DTMF message self.master.sendMsgToAll("{:d}:dtmf:{:s}".format(self.line_no, dtmf)) def play(self, sound): """play sound request""" self.master.output("request to play sound \"{:s}\" on line {:d}". format(sound, self.line_no)) self.widSoundEntry.set_text(sound) # end of class Call def parse_addr(addr_str): """parse network address""" fields = addr_str.split(":") if len(fields) != 2: raise ValueError("invalid address \"{:s}\"".format(addr_str)) host = fields[0] port_str = fields[1] try: port = int(port_str) if port < 1 or port > 65535: raise ValueError("out of range 1..65535") except ValueError as ve: raise ValueError( "invalid port \"{:s}\": {:s}".format(port_str, str(ve))) return (host, port) def parse_args(): """parse command line arguments""" parser = argparse.ArgumentParser( "phonesim - phone simulator for EBIP protocol") parser.add_argument( "-b", "--bind", action="store", dest="bind", default="127.0.0.1:1234", help="bind address") parser.add_argument( "-v", "--verbose", action="store_true", dest="verbose", default=False, help="verbose mode") args = parser.parse_args() args.bind = parse_addr(args.bind) return args def main(): """main program""" # parse arguments args = parse_args() # create master window master = Master(args.bind, args.verbose) # run application Gtk.main() # done return 0 # main application entry point if __name__ == "__main__": sys.exit(main())