[r80]: trunk / misc / bw / server.srp Maximize Restore History

Download this file

server.srp    680 lines (618 with data), 26.2 kB

# server -- receive messages over OSC
# rbd
#
# this is a companion to sender.srp

# with remote clients, we want to
#   osc_server_init() to create the first port
#   osc_server_port() to add two more ports
#   use osc_server_reply() to get reply address for each client
#   use osc_delete_address() to remove reply addresses that expire
#
# with localhost clients, we want to
#   osc_server_init() to create the first port
#   osc_server_port() to add two more ports
#   use osc_create_address() to make a reply address for the client
#   use osc_delete_address() to remove reply address when it expires

require "debug"
dbg_stack_print = t
require "clients"
require "wxserpent"
require "config"
require "netperf"
require "prefs"
require "http"
require "display"
require "serverstats"
require "utils"
require "sched"

RIFFS_LAUNCH = 99
RIFFS_CUE_END = 98
RIFFS_FINISHED = 97

state_disp = nil
riffmode = false
drum_velocity_boost = 0

prefs = Prefs("GNO")

def make_roster()
    roster = []
    var inf = open("roster.txt", "r")
    if not inf:
        print "ERROR: No roster.txt FILE FOUND"
        exit()
    var txt = inf.readline()
    var voice = 0
    while txt:
        var sp = String_parse(txt)
        var usernum = sp.get_delimited("|") // User Number (which is not voice)
        // usernum has "|" appended, so remove it
        usernum = subseq(usernum, 0, -1)
        var name = sp.get_delimited("|")
        // name has "|" appended, so remove it
        name = subseq(name, 0, -1)
        sp.get_delimited("|") // skip over email address
        var ticket = int(sp.get_delimited("|")) // works even if no "|"
        roster.append([name, voice, ticket, usernum])
        txt = inf.readline()

// a roster is a set of client names associated with
// a pair: [client_index, client_ticket]. The client_index is
// the small integer number that is their position in the state
// vector, and the client_ticket is their password used for
// authentication in every message

// THIS TEST DATA IS OVERWRITTEN IF YOU CALL make_roster() BELOW
roster = [["client1", 0, 123456, 10],
          ["client2", 1, 234567, 11],
          ["client3", 3, 345678, 12]]

make_roster() // get roster from roster.txt file
display "make_roster", roster

login_received_count = 0
ce_received_count = 0
sent_count = 0
dropnext = false    // used to force drop incoming message

CLOCK_SYNC_ACK = 0x10000
PLAY_STATE = 0x20000
DEFAULT_STATUS = "Server"
RT10 = 200 / SLOW // how often to report round trip times

class Client
    var addr // socket addressing this client, nil means not-connected
    var connected_since
    var last_server_seqno
    var last_client_seqno
    var last_recv_time
    var drop_base_seqno // client_seqno when we started sampling period
    var drop_base_time  // time when we started sampling period
    var drop_base_count // received count at start of period
    var droprate_to_client // latest droprate from client
    var received_count // counts messages received since drop_base_seqno
    var droprate_to_server
    var pitch
    var velocity
    var needs_clock_sync
    var name
    var usernum
    var ticket
    var index // the voice (sample) index of the client, also state index
    var yes_or_no // clients can answer questions posed to them
    var recent_round_trip_max, recent_round_trip_min, round_trip_sum, round_trip_count
    var round_trip_max, round_trip_min, round_trip_mean
    
    def init(name_, ticket_, index_, usernum_)
        name = name_
        ticket = ticket_
        usernum = usernum_
        index = index_
        addr = nil // not connected
        pitch = 0
        received_count = 0
        velocity = 0
        droprate_to_client = 0
        droprate_to_server = 0
        last_server_seqno = 0
        recent_round_trip_max = 0
        recent_round_trip_min = 100000
        round_trip_sum = 0
        round_trip_count = 0
        round_trip_max = 0
        round_trip_min = 100000
        round_trip_mean = 0

    def start(client_port)
        // invariant: addr <==> client is in clients.all_clients
        if not addr
            if client_port != "": // don't need this
                addr = osc_create_address("localhost", client_port, false)
                if li_debug:
                    display "client::start osc_create_address localhost", client_port, addr
            else
                addr = osc_server_reply()
            clients.activate_client(this)
        connected_since = real_now
        last_client_seqno = 0
        drop_base_seqno = last_client_seqno
        drop_base_count = received_count
        drop_base_time = real_now
        last_recv_time = real_now
        if to_debug:
            display "start", real_now, last_recv_time

    def terminate()
        display "No message for 20s, client", real_now, name
        osc_delete_address(addr)
        addr = nil
        // invariant: addr <==> client is in clients.all_clients
        clients.remove_client(this)

    // Client::
    def send_message():
        if not addr
            return // connection not made yet or closed
        osc_send_start()
        last_server_seqno = last_server_seqno + 1
        // first int32 is seqno (low order), then msg type, then droprate
        osc_add_int32(last_server_seqno & 0xffff + PLAY_STATE +
                      (droprate_to_server << 24))
        osc_add_int32(now_ms)
        osc_add_int32(last_client_seqno)
        osc_add_blob(play_state)
        osc_send(addr, "/ss")
        sent_count = sent_count + 1
        if send_debug:
            display "send /ss", real_now, last_server_seqno, now_ms, last_client_seqno, name
        if show_state:
            print "**** SENDING STATE ****"
            for name at v in roster
                print v, "p"; ord(play_state[v * 2]), "v"; ord(play_state[v * 2 + 1]), name[0]
        if paint_state:
            if not state_disp:
                state_disp = State_disp(0, 2, 95, SD_COL_WIDTH * 2, SD_GRID_SPACING * 61)
            state_disp.refresh(t)

    def send_ack(index)
        if not addr
            if li_debug:
                display "WARNING send_ack FAILS", addr
            return // connection not made yet or closed
        osc_send_start()
        last_server_seqno = last_server_seqno + 1
        osc_add_int32(last_server_seqno) // server msgs will start here
        osc_add_int32(index) // index of client in samples
        // permanent client id used to determine which part to play:
        osc_add_int32(int(usernum))
        osc_add_int32(now_ms) // server time for rough initial synchronization
        osc_send(addr, "/sa")
        sent_count = sent_count + 1
        if send_debug:
            display "send /sa", real_now, index, name, addr

    // Client::
    def handle_message(seqno, ctime, droprate, state, latencybase):
        if not addr or last_client_seqno >= seqno:
            if recv_debug:
                display "    -> stale message dropped", real_now, last_client_seqno
            return // this message is stale
        last_client_seqno = seqno
        // store dropcount for statistics
        droprate_to_client = droprate
        pitch = state & 0xff
        velocity = (state >> 8) & 0xff
        if recv_debug:
            display "    ", real_now, pitch, velocity
        // update global state data for conversion to message xstate vector
        clients_play_state[index] = chr(pitch) + chr(velocity)
        // statistics - droprate
        last_recv_time = real_now
        if to_debug:
            display "handle_message", real_now, index, last_recv_time
        received_count = received_count + 1
        if real_now - drop_base_time >= 10:
            var expected = seqno - drop_base_seqno
            var received = received_count - drop_base_count
            droprate_to_server = 100 - round(received * 100 / expected)
            drop_base_time = real_now
            drop_base_seqno = seqno
            drop_base_count = received_count
            if recv_debug:
                print "    droprate to server:", droprate; "%"
        if needs_clock_sync:
            needs_clock_sync = false
            if not addr
                return // connection not made yet or closed
            osc_send_start()
            last_server_seqno = last_server_seqno + 1
            // first int32 is seqno (low order), then msg type, then droprate
            osc_add_int32(last_server_seqno & 0xffff + (seqno << 16))
            osc_add_int32(now_ms)
            osc_add_int32(ctime)
            osc_send(addr, "/sc")
            sent_count = sent_count + 1
            if cs_debug:
                display "send /sc", real_now, last_server_seqno, seqno, now_ms, ctime, name 
        var round_trip = now_ms - latencybase
        recent_round_trip_max = max(recent_round_trip_max, round_trip)
        recent_round_trip_min = min(recent_round_trip_min, round_trip)
        round_trip_sum = round_trip_sum + round_trip
        round_trip_count = round_trip_count + 1
        if round_trip_count > RT10: // every 10s
            round_trip_mean = round_trip_sum / round_trip_count
            round_trip_count = 0
            round_trip_sum = 0
            round_trip_min = recent_round_trip_min
            recent_round_trip_min = 100000
            round_trip_max = recent_round_trip_max
            recent_round_trip_max = 0


clients = Clients()
clients_play_state = array(len(roster))
NO_CLIENT_STATE = chr(0) + chr(0)
for row at i in roster:
    clients.insert_client(row[2], row[0], i, row[3])
    clients_play_state[i] = NO_CLIENT_STATE

clients.sanity_check()

// timestamped note-on/off messages are appended here and sent to clients
drum_circle_msg = ["N   "]
drum_circle_solo_next = 200
drum_circle_solo_index = 200
drum_circle_solo_requests = []
drum_circle_solo_did_solo = []


send_state_interval = 0.05 * SLOW
send_state_next_time = 1
http_poll_time = 1
// a client should get a clock sync message every 6 seconds so there
// are about 10 per minute. Interval is 6s / max number of clients.
clock_sync_interval = (6 / len(roster))
clock_sync_next_time = 0
clock_sync_next_index = 0
timeout_next_time = 1
timeout_interval = 0.2 // scan all 100 every 20s, if no activity, take action
timeout_next_index = 0

osc_debug = nil
msg_debug = nil
li_debug = nil or msg_debug // login debug
cs_debug = nil or msg_debug // clock sync
recv_debug = nil or msg_debug
send_debug = nil or msg_debug
to_debug = nil or msg_debug // timeout debugging

// message format:
//  header
//   int32 ticket (identifies client)
//   int32 seqno
//   int32 ctime
//   int32 replytoseqno
//   int32 latencybase
//   int32 dropcount
//  payload
//   int32 timestamp
//   int32 state (pitch in low order 8 bits +  velocity << 8

// messages:
//   /cl -- client login
//   /ce -- client event
//   /ss -- server state
//   /sc -- server clock sync
//   /sa -- server ack (to login)

def check_port_error(err, which):
    if err != 0:
        wxs_message_box("osc server port creation failed on port " +
                        which + " (error " + str(err) + ")",
                        "Error", WXS_STYLE_EXCLAMATION, 0)

def startup()
    check_port_error(osc_server_init(server_port1, osc_debug), server_port1)
    check_port_error(osc_server_port(server_port2), server_port2)
    check_port_error(osc_server_port(server_port3), server_port3)
    osc_server_poll() // flush any pending messages at startup
    osc_server_method("/ce", "iiiiiiii", nil, 'osc_handler')
    osc_server_method("/cl", "iss", nil, 'client_login')
    // client_addr = osc_create_address(client_ip, client_port, false)
    zmq_init()
    // make a pull socket to receive audio
    pull_audio_sock = zmq_open_pull()
    display "startup 1", pull_audio_sock, zmq_bind(pull_audio_sock, "tcp", "*", int(server_pull_audio_port))
    // make a publish socket to distribute audio
    publish_sock = zmq_open_publish()
    display "startup 2", publish_sock, zmq_bind(publish_sock, "tcp", "*", int(server_publish_port))
    

start_time = time_get() // needed to initialize Client
real_now = 0
now_ms = 0
rtsched = Scheduler()
// normally, this is initialized to time_get() and we call poll with time_get()
// instead, we initialize with 0 and call poll with time_get() - start_time
rtsched.time_offset = 0

def compute_play_state():
    play_state = flatten(clients_play_state)


def client_login(path, ticket, name, client_port):
    // look up client in roster
    var client = clients.find_client(ticket)
    if li_debug:
        display "recv /cl", real_now, ticket, name, client, client_port
        if client
            display "recv /cl", client.addr
    if not client or name != client.name:
        display "LOGIN FAILURE", name, ticket
        return
    // on first contact, retain reply address
    client.start(client_port)
    // ack client with its index in the state
    client.send_ack(client.index)
    login_received_count = login_received_count + 1


messages_in = 0
def osc_handler(path, ticket, seqno, ctime, replytoseqno, latencybase,
                droprate, timestamp, state):
    if riffmode: // ignore all incoming /ce messages, use "N" on zmq port instead
        return
    if dropnext:
        print "dropped incoming /ce message"
        dropnext = false
        return
    messages_in = messages_in + 1
    ce_received_count = ce_received_count + 1
    var client = clients.find_client(ticket)
    if not client:
        display "bad client ticket in message", real_now, ticket
        return
    if recv_debug:
        display "recv /ce", real_now, ticket, seqno, ctime, replytoseqno, latencybase, droprate, timestamp
    client.handle_message(seqno, ctime, droprate, state, latencybase)


netperf_update_time = 0

def get_status(): // compute status string
    str(clients.num_clients) + " clients"

zmq_active = nil

sras_index = 0
riff_cycles = 0
def select_riffs_and_soloist(id)
    drum_circle_solo_index = drum_circle_solo_next
    display "select_riffs_and_soloist", the_sched.time, real_now
    if id != sras_index:
        return
    // select riffs and soloist index
    // for now, there are just 2 riffs. Start with 0 and change
    // every 4 cycles
    var r = riff_cycles % 8
    r = 0 if r < 4 else 1
    display "select_riffs_and_soloist", r
    var msg = "D   " + chr(r) + chr(r) + chr(r) + chr(r)
    riff_cycles = riff_cycles + 1

    // select solo index from the list
    if len(drum_circle_solo_requests) == 0:
        // refill the pool
        drum_circle_solo_requests = drum_circle_solo_did_solo
        drum_circle_solo_did_solo = []
    drum_circle_solo_next = 200 // a non-existent player index
    if len(drum_circle_solo_requests) > 0:
        // pick at random from the pool
        var i = irandom(len(drum_circle_solo_requests))
        var index = drum_circle_solo_requests[i]
        if drum_circle_solo_next != index:
            drum_circle_solo_requests.remove(index)
            drum_circle_solo_did_solo.append(index)
            drum_circle_solo_next = index
    msg = msg + chr(drum_circle_solo_next)
    zmq_send(publish_sock, msg)
    display "select_riffs_and_soloists, sent", drum_circle_solo_next
    // next selection is in 16 beats at 100bpm = 9.6s
    the_sched.cause(9.6, nil, 'select_riffs_and_soloist', id)



def zmq_poll():
    var gno_msg = zmq_recv_noblock(pull_audio_sock)
    // since ZMQ will not drop these messages, drop them
    // here if the server started recently (30s) because
    // they are probably stale. Client must let server run
    // 30s before any audio will go through. This is a feature :-)
    if (not gno_msg) or real_now < 30 or len(gno_msg) < 8:
        return
    elif not zmq_active:
        print "ZMQ IS ACTIVELY LISTENING NOW"
        zmq_active = t
    var ticket = (ord(gno_msg[7]) << 24) + (ord(gno_msg[6]) << 16) +
                 (ord(gno_msg[5]) << 8) + ord(gno_msg[4])
    var client = clients.find_client(ticket)
    if client:
        client.last_recv_time = real_now
        if gno_msg[0] == "N":
            if not riffmode:
                return
            // send note and timestamp to clients. timestamp is advanced
            // at clients so we can send the original timestamp
            // We're building a string by creating an array of strings
            // that we will flatten later (just before sending the message)
            var timestamp = subseq(gno_msg, 8, 12)
            drum_circle_msg.append(timestamp)  // message gets timestamp
            drum_circle_msg.append(chr(client.index)) //       voice
            drum_circle_msg.append(gno_msg[12])       //       pitch
            // velocity is 127 for soloist, actual + boost for Janelle,
            //    and 40 for others
            var vel = ord(gno_msg[13])
            if vel > 0:
                if client.index == drum_circle_solo_index:
                    vel = 127
                elif client.index != 1: 
                    vel = 80
                else: // adjust drums by boost
                    // drum_velocity_boost could be negative, so
                    // make sure adjusted vel > 0
                    vel = max(1, vel + drum_velocity_boost)
            drum_circle_msg.append(chr(vel))          //       vel
            // this should be very rare, but just in case there's a
            // flood of notes, let's break things up into separate
            // messages. 800 elements -> 200 notes -> 1400 bytes
            if len(drum_circle_msg) > 800:
                zmq_send(publish_sock, flatten(drum_circle_msg))
                drum_circle_msg.set_len(1) // back to ["N   "]
            return
        elif gno_msg[0] == "Q":
            display "server got solo request", client.index, gno_msg[8]
            var req = gno_msg[8]
            if req == "Y":
                if client.index not in drum_circle_solo_requests and
                   client.index not in drum_circle_solo_did_solo:
                    drum_circle_solo_requests.append(client.index)
                display "Y", drum_circle_solo_requests, drum_circle_solo_did_solo
            elif req == "N":
                drum_circle_solo_requests.remove(client.index)
                drum_circle_solo_did_solo.remove(client.index)
            elif req == "X" and len(gno_msg) > 9:
                drum_velocity_boost = int(subseq(gno_msg, 9))
        elif gno_msg[0] == "L":
            var sub = subseq(gno_msg, 8)
            var index = int(sub) // get the index
            if index == RIFFS_LAUNCH:
                riffmode = true
                drum_circle_solo_requests = []
                drum_circle_solo_did_solo = []
                var pos = find(sub, " ")
                if pos == -1:
                    display "zmq_poll - bad message", msg_type, zmq_msg, sub
                    return
                sub = subseq(sub, pos + 1)
                var start = int(sub) / 1000
                // first solo will be 40 beats after start (24 + 16), or
                // 24 seconds at 100bpm
                sras_index = sras_index + 1
                // to map start+24 to scheduler absolute time,
                //     subtract rtsched.time to get delay
                drum_circle_solo_next = 200
                drum_circle_solo_index = 200
                rtsched.cause(start + 24 - rtsched.time, nil,
                              'select_riffs_and_soloist', sras_index)
                display "cause sras", start + 24, rtsched.time, start + 24 - rtsched.time, start, start_time
                // BUG: we should turn off all notes and send an /ss msg
                mode_text.set_string("Riff Mode")
            elif index == RIFFS_FINISHED:
                sras_index = sras_index + 1 // kill solo selection process
                riffmode = false
                mode_text.set_string("Normal Mode")
            // index == RIFFS_CUE_END indicates time to end riffmode
        elif gno_msg[0] == "Y":
            client.yes_or_no = t
            return
        elif gno_msg[0] == "R":
            for cl in clients.ticket_to_client.values():
                cl.yes_or_no = false
        zmq_send(publish_sock, gno_msg)
        display "zmq_poll forwarding", gno_msg[0], len(gno_msg)
    else:
        display "zmq_poll bad ticket", ticket, hex(ticket), len(gno_msg)


def timer_callback():
    var client
    real_now = time_get() - start_time
    now_ms = int(real_now * 1000)
    osc_server_poll()
    // check for incoming audio
    zmq_poll()
    
    compute_play_state() // this need not be done for every xmit
    if not riffmode and
       send_state_next_time < real_now and clients.num_clients > 0:
        for client in clients.all_clients
            client.send_message()
        // use max so if we get behind by a whole period, we won't
        // try to catch up
        send_state_next_time = max(real_now,
                                   send_state_next_time + send_state_interval)
    if real_now >= clock_sync_next_time and clients.num_clients > 0:
        clock_sync_next_index = (clock_sync_next_index + 1) % clients.num_clients
        client = clients.all_clients[clock_sync_next_index]
        clock_sync_next_time = real_now + clock_sync_interval
        client.needs_clock_sync = true
    if real_now >= timeout_next_time and clients.num_clients > 0:
        timeout_next_index = (timeout_next_index + 1) % clients.num_clients
        client = clients.all_clients[timeout_next_index]
        timeout_next_time = real_now + timeout_interval
        if client.addr and client.last_recv_time + 20 < real_now:
            if to_debug:
                display "timeout detected", real_now, timeout_next_index
                display "                ", client.last_recv_time, client.index
            // no message for 20s, but that's possible in drum mode
            if riffmode:
                client.last_recv_time = real_now
            else:
                client.terminate()
        // here's a handy place that gets run every 0.2s
        if len(drum_circle_msg) > 1:
            zmq_send(publish_sock, flatten(drum_circle_msg))
            # display "sent N msg", len(drum_circle_msg)
            drum_circle_msg.set_len(1) // back to ["N   "]
        rtsched.poll(real_now)
    if real_now > http_poll_time:
        publish_server_poll()
        http_poll_time = real_now + 0.5
    if real_now >= netperf_update_time:
        netperf.set_fields(get_status(), login_received_count + ce_received_count, sent_count)
        display "counts", login_received_count, ce_received_count
        netperf_update_time = real_now + 10
        // send stats to website every 10s too and write to file
        var start = time_get()
        publish_server_stats()
        print "publish_server_stats time", time_get() - start

        
def quit(rest ignore):
    publish_server_finish()
    osc_server_finish()
    display "done", real_now, messages_in
    exit()


def dropnext(rest ignore)
    dropnext = true

show_state = false
def show_state(obj, event, x, y)
    show_state = x

paint_state = false
def paint_state(obj, event, x, y)
    paint_state = x


def post_network_info(rest ignore):
    if not prefs.get('webauth'):
        prefs.set('webauth',
                  wxs_get_text("Enter code for website",
                               "Post network info...", "", 0))
        prefs.save()
    var ins = wxs_get_text("Enter server IP address",
                           "Post network info...", "", 0)
    display "post_network_info", ins
    req = Http_post()
    req.add_field("username", "server")
    req.add_field("password", prefs.get('webauth'))
    req.add_field("request", "setserverinfo")
    var sep = " "
    req.add_field("data", ins + sep + server_port1 + sep +
                  server_port2 + sep + server_port3 + sep +
                  server_pull_audio_port + sep + server_publish_port)
    req.post("http://globalnetorchestra.org/pnoinfo.php",
             "globalnetorchestra.org")
    var done = false
    while not done
        var count = 0
        while count < 150 and not done
            time_sleep(0.1)
            req.poll()
            done = req.state == 'fail' or req.state == 'done'
            count = count + 1
            default_window.set_status("Post network info waiting " +
                                      str(round((150 - count) / 10)))
        if not done:
            if wxs_message_box("Post network info has not completed." +
                               "Keep trying? (YES). Press NO to stop.",
                               "Post network info timeout",
                               WXS_STYLE_YES_NO, 0) != WXS_MSG_YES:
                done = true
    if req.state == 'fail':
        wxs_message_box("Post network info encountered an error: " +
                        req.error, "Post network info error",
                        WXS_STYLE_ERROR, 0)
    else:
        wxs_message_box("Post network info completed and returned: " +
                        req.data, "Post network info completed",
                        WXS_STYLE_OK,0)
    default_window.set_status(DEFAULT_STATUS)


def file_menu_handler(obj, event, x, y):
    display "file_menu_handler", obj, event, x, y
    if x == 0:
        exit()

def make_gui()
    quit_button = Button(0, "Quit", 5, 5, 100, 20)
    quit_button.method = 'quit'
    dropnext_button = Button(0, "Drop", 110, 5, 100, 20)
    dropnext_button.method = 'dropnext'
    show_state_cbox = Checkbox(0, "Print State", 215, 5, 100, 20)
    show_state_cbox.method = 'show_state'
    paint_state_cbox = Checkbox(0, "Display State", 320, 5, 120, 20)
    paint_state_cbox.method = 'paint_state'
    mode_text = Statictext(0, "Normal Mode", 215, 30, 70, 20)
    netperf = Netperf(0, 5, 30, nil)
    file_menu = Menu(0, "File")
    var i = file_menu.item("Post network info...",
                   "Put IP address and ports on website for clients",
                   nil, nil, 'post_network_info')
    display "file_menu.item for Post network info", i
    file_menu.method = 'file_menu_handler'

def run()
    default_window.show_status(t)
    default_window.set_status(DEFAULT_STATUS)
    startup()
    make_gui()
    wxs_timer_start(2, 'timer_callback')

run()