[tuxdroid-svn] r246 - svnlook: warning: cannot set LC_CTYPE locale svnlook: warning: environment va
Status: Beta
Brought to you by:
ks156
From: svnlook:warning@affinitic.be:cannot s. L. l. <c2m...@c2...> - 2007-04-13 10:15:39
|
Author: svnlook: warning: cannot set LC_CTYPE locale Date: svnlook: warning: environment variable LANG is EN New Revision: 246 Added: api/python/trunk/tux.py api/python/trunk/tuxapi_class.py api/python/trunk/tuxapi_const.py Modified: Log: remi 2007-04-13 12:15:35 +0200 (Fri, 13 Apr 2007) 514 UPD Start of the version 0.2.0 of the api. The api was completely remade. I Kept the changes made by Neimad lately. Sorry for this brutal update. I have implemented the 'join()' suggestion from Damien for the threads too. Consequently, this api is not fully compatible with the last one. A documentation system based on the docstring has been implemented and you can build this with the function 'tux.misc.build_documentation'. See 'tux.misc.doc(tux.misc.build_documentation)'. See 'tux.misc.doc(tux)' too svnlook: warning: cannot set LC_CTYPE locale svnlook: warning: environment variable LANG is EN svnlook: warning: please check that your locale name is correct Added: api/python/trunk/tux.py =================================================================== --- api/python/trunk/tux.py (rev 0) +++ api/python/trunk/tux.py 2007-04-13 10:15:35 UTC (rev 246) @@ -0,0 +1,26 @@ +#!/usr/bin/python +# -*- coding: latin-1 -*- + +import sys +from tuxapi_const import * +import tuxapi_class +import tuxapi_wav_merger +import signal + +global tux +tux=tuxapi_class.TUXTCPCommunicator() +tux.misc.print_api_version() +wavs=tuxapi_wav_merger.WavMerger(tux) +tux.daemon.connect() +tux.tts.connect() +tux.daemon.set_my_client_name("Py Client") +tux.daemon.auto_connect(True) +tux.tts.auto_connect(True) +tux.print_warnings=True + +def exit(signum,frame): + tux.destroy() + sys.exit(signum) + +signal.signal(signal.SIGTERM, exit) +signal.signal(signal.SIGINT, exit) Added: api/python/trunk/tuxapi_class.py =================================================================== --- api/python/trunk/tuxapi_class.py (rev 0) +++ api/python/trunk/tuxapi_class.py 2007-04-13 10:15:35 UTC (rev 246) @@ -0,0 +1,4291 @@ +#!/usr/bin/python + +# ----------------------------------------------------------------------------- +# Tux Droid - API Class +# Copyright (C) 2007 C2ME Sa <rem...@c2...> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA +# 02111-1307, USA. +# ----------------------------------------------------------------------------- +# $Id: tuxapi_class.py 192 2007-03-22 14:25:37Z remi $ +# ----------------------------------------------------------------------------- + +import sys +import os +import time +import socket +import thread +import threading +import string +from tuxapi_const import * + +#============================================================================== +# Constants +#============================================================================== + +api_version ="0.2.0 (SVN/UNRELEASED)" + +#============================================================================== +# TUXTCPCommunicator class +#============================================================================== +class TUXTCPCommunicator(object): + """ + Main class of tux object + + Sub class of this class: + "cmd" as class : Class which manages the tux commands + "daemon" as class : Class which manages the daemon commands + "event" as class : Class which manages the events + "hw" as class : Class which manages the tux hardware + "misc" as class : Class which manages the miscellaneous + functions + "status" as class : Class which manages the request of a + status + "sys" as class : Class which manages the system functions + "tts" as class : Class which manages the text to speech + + Global variables of this class: + "my_name" as string : Name of the api instance + "print_status" as boolean : Allow to print the raw statuses + "print_warnings" as boolean : Allow to print the warnings + + Comments: + Call the destroying function at the end of your script for closing + the api correctly. + >>> tux.destroy() + """ + #-------------------------------------------------------------------------- + # Constructor of class + #-------------------------------------------------------------------------- + def __init__(self,deprecated_p=5000,deprecated_a='localhost'): + """ + Constructor of class + + Example: + >>> tux=TUXTCPCommunicator() + """ + self.main_thread_list=[] + self.exit_flag=False + self.tcp_mutex=threading.Lock() + self.tcp_data=(' ')*6 + self.datas_threated=True + self.tcp_data_for_event=(' ')*6 + self.my_name="Tux client" + self.print_status=False + self.print_warnings=False + self.print_debug_thread=False + self.event=TUXevent(self) + self.cmd=TUXcmd(self) + self.sys=TUXsys(self) + self.daemon=TUXdaemon(self) + self.sdaemon=self.daemon + self.status=TUXStatus(self) + self.hw=TUXhw(self) + self.tts=TUXtts(self) + self.misc=TUXmisc(self) + self.connected=False #deprecated + self.last_threated_data_id=0 + self.current_data_id=0 + t=threading.Thread(target=self.daemon._loop_auto_connect) + t.setName('daemon._loop_auto_connect') + t.start() + self.main_thread_list.append(t) + t=threading.Thread(target=self.tts._loop_auto_connect) + t.setName('tts._loop_auto_connect') + t.start() + self.main_thread_list.append(t) + + #-------------------------------------------------------------------------- + # Destructor of class + #-------------------------------------------------------------------------- + def destroy(self): + self.daemon.auto_connect(False) + self.tts.auto_connect(False) + self.daemon.disconnect() + self.tts.disconnect() + self.daemon._tcp_threads_join() + self.tts._tcp_threads_join() + self.exit_flag=True + for main_thread in self.main_thread_list: + if self.print_debug_thread: + print "'%s' has been released"%main_thread.getName() + if main_thread.isAlive(): + main_thread.join() + self=None + print "Tux object has been destroyed" + + #-------------------------------------------------------------------------- + # SYSTEM function + #-------------------------------------------------------------------------- + def _loop_tux_tcp_msg(self): + """ + Not a user function + """ + while self.daemon.connected: + try: + self.tcp_mutex.acquire() + tmp_tcp_data=self.sock.recv(16) + except socket.timeout: + self.tcp_mutex.release() + time.sleep(0.01) + continue + except socket.error: + self.sock.close() + self.daemon.connected=False + self.tcp_mutex.release() + time.sleep(0.01) + continue + if len(tmp_tcp_data)<16: + self.tcp_mutex.release() + time.sleep(0.01) + continue + self.current_data_id=(self.current_data_id+1)&0xFFFFFF + self.tcp_data=tmp_tcp_data + data=self.tcp_data + self.datas_threated=False + if self.print_status: + struct_data = ["%.2x" % ord(datae) for datae in self.tcp_data] + print " ".join(struct_data) + self.tcp_mutex.release() + time.sleep(0.01) + + #-------------------------------------------------------------------------- + # SYSTEM function + #-------------------------------------------------------------------------- + def _loop_tux_data_dispatching(self): + """ + Not a user function + """ + while self.daemon.connected: + while self.datas_threated: + if not self.daemon.connected: return + time.sleep(0.003) + self.tcp_mutex.acquire() + data=self.tcp_data + self.tcp_mutex.release() + self._dispatch_data_main(data) + self.datas_threated=True + + #-------------------------------------------------------------------------- + # SYSTEM function + #-------------------------------------------------------------------------- + def _dispatch_data_main(self,data): + """ + Not a user function + """ + # from daemon + if ord(data[0])==SOURCE_SUB_DAEMON: + self._dispatch_data_from_daemon(data) + # from tux droid + if ord(data[0])==SOURCE_TUX: + self._dispatch_data_from_tux(data) + + #-------------------------------------------------------------------------- + # SYSTEM function + #-------------------------------------------------------------------------- + def _dispatch_data_from_daemon(self,data): + """ + Not a user function + """ + # data type command + if ord(data[2])==DATA_TP_CMD: + if ord(data[3])==SUBDATA_TP_STRUCT: + # daemon close + if ord(data[4])==SUB_D_CMD_STRUC_DISCONNECT_CLIENT: + self.daemon.disconnect() + + #-------------------------------------------------------------------------- + # SYSTEM function + #-------------------------------------------------------------------------- + def _dispatch_data_from_tux(self,data): + """ + Not a user function + """ + # data type response + if ord(data[2])==DATA_TP_RSP: + if ord(data[3])==SUBDATA_TP_STATUS: + if self.event.on_status!=None: + self.event.on_status() + #Head button + if ord(data[4])==DATAS_STATUS_HEAD_PUSH_SWITCH: + if ord(data[5])==1: + if self.event.on_head_bt_pushed!=None: + self.event.on_head_bt_pushed() + if self.event.on_bt_pushed!=None: + self.event.on_bt_pushed() + else: + if self.event.on_head_bt_released!=None: + self.event.on_head_bt_released() + if self.event.on_bt_released!=None: + self.event.on_bt_released() + #Left wing button + if ord(data[4])==DATAS_STATUS_LEFT_WING_PUSH: + if ord(data[5])==1: + if self.event.on_lwing_bt_pushed!=None: + self.event.on_lwing_bt_pushed() + if self.event.on_bt_pushed!=None: + self.event.on_bt_pushed() + else: + if self.event.on_lwing_bt_released!=None: + self.event.on_lwing_bt_released() + if self.event.on_bt_released!=None: + self.event.on_bt_released() + #Right wing button + if ord(data[4])==DATAS_STATUS_RIGHT_WING_PUSH: + if ord(data[5])==1: + if self.event.on_rwing_bt_pushed!=None: + self.event.on_rwing_bt_pushed() + if self.event.on_bt_pushed!=None: + self.event.on_bt_pushed() + else: + if self.event.on_rwing_bt_released!=None: + self.event.on_rwing_bt_released() + if self.event.on_bt_released!=None: + self.event.on_bt_released() + #Remote button + if ord(data[4])==DATAS_STATUS_IR_CODE: + if self.event.on_remote_bt[ord(data[5])]!=None: + self.event.on_remote_bt[ord(data[5])]() + if self.event.on_remote!=None: + self.event.on_remote(ord(data[5])) + #Mouth open + if ord(data[4])==DATAS_STATUS_MOUTH_OPEN_POSITION: + if ord(data[5])==0: + if self.event.on_mouth_open!=None: + self.event.on_mouth_open() + #Mouth close + if ord(data[4])==DATAS_STATUS_MOUTH_CLOSED_POSITION: + if ord(data[5])==0: + if self.event.on_mouth_close!=None: + self.event.on_mouth_close() + #Ledl + if ord(data[4])==DATAS_STATUS_LEFT_BLUE_LED: + if ord(data[5])==1: + if self.event.on_left_blue_led_on!=None: + self.event.on_left_blue_led_on() + else: + if self.event.on_left_blue_led_off!=None: + self.event.on_left_blue_led_off() + #Ledr + if ord(data[4])==DATAS_STATUS_RIGHT_BLUE_LED: + if ord(data[5])==1: + if self.event.on_right_blue_led_on!=None: + self.event.on_right_blue_led_on() + else: + if self.event.on_right_blue_led_off!=None: + self.event.on_right_blue_led_off() + #Eyes open/close + if ord(data[4])==DATAS_STATUS_EYES_CLOSED_POSITION_SWITCH: + if ord(data[5])==0: + if self.event.on_eyes_close!=None: + self.event.on_eyes_close() + else: + if self.event.on_eyes_open!=None: + self.event.on_eyes_open() + #Power plug switch + if ord(data[4])==DATAS_STATUS_POWER_PLUG_SWITCH: + if ord(data[5])==1: + if self.event.on_power_plugged!=None: + self.event.on_power_plugged() + else: + if self.event.on_power_unplugged!=None: + self.event.on_power_unplugged() + #RF status + if ord(data[4])==DATAS_STATUS_RF_CONNECTED: + if ord(data[5])==1: + self.status.rf_connected=True + if self.event.on_rf_connected!=None: + self.event.on_rf_connected() + else: + self.status.rf_connected=False + if self.event.on_rf_disconnected!=None: + self.event.on_rf_disconnected() + #PONG + if ord(data[4])==DATAS_STATUS_PONG: + if self.event.on_pong_received!=None: + rcv=ord(data[6]) + rem=ord(data[5]) + avg=int((100*rcv)/(200-rem)) + self.event.on_pong_received(rcv,rem,avg) + #mouth stop + if ord(data[4])==DATAS_STATUS_MOUTH_POSITION_COUNTER: + if ord(data[5])==0: + if self.event.on_mouth_stop!=None: + self.event.on_mouth_stop() + #eyes stop + if ord(data[4])==DATAS_STATUS_EYES_POSITION_COUNTER: + if ord(data[5])==0: + if self.event.on_eyes_stop!=None: + self.event.on_eyes_stop() + + #wings stop + if ord(data[4])==DATAS_STATUS_WINGS_POSITION_COUNTER: + if ord(data[5])==0: + if self.event.on_wings_stop!=None: + self.event.on_wings_stop() + #spin stop + if ord(data[4])==DATAS_STATUS_SPIN_POSITION_COUNTER: + if ord(data[5])==0: + if self.event.on_spin_stop!=None: + self.event.on_spin_stop() + + + #-------------------------------------------------------------------------- + # Deprecated functions + #-------------------------------------------------------------------------- + def explicit_status(self): + """ + Deprecated : see 'tux.status.to_string' + """ + return self.status.to_string() + + def connect_to_daemon(self): + """ + Deprecated : see 'tux.daemon.connect' + """ + return self.daemon.connect() + + def disconnect_from_daemon(self): + """ + Deprecated : see 'tux.daemon.disconnect' + """ + self.daemon.disconnect() + + +#============================================================================== +# TUXTCPCommunicator - sys - class +#============================================================================== +class TUXsys(object): + """ + Class which manages the system functions + + Functions list for the users: + tux.sys.add_time_event + tux.sys.clear_time_events + tux.sys.delayed_function + tux.sys.looped_function + tux.sys.shell + tux.sys.shell_free + tux.sys.time + tux.sys.wait + """ + + #-------------------------------------------------------------------------- + # Constructor of class + #-------------------------------------------------------------------------- + def __init__(self,parent): + """ + Constructor of class + """ + self.parent=parent + self.events_list=[[0,'NONE',9999,99,99,99,99,99]] + + #-------------------------------------------------------------------------- + # Get the current time in seconds + #-------------------------------------------------------------------------- + def time(self): + """ + Get the current time in seconds + + Return an integer + + Example: + >>> var=tux.sys.time() + """ + return (time.localtime()[3]*3600)+(time.localtime()[4]*60)+time.localtime()[5] + + #-------------------------------------------------------------------------- + # Wait a time in seconds + #-------------------------------------------------------------------------- + def wait(self,seconds): + """ + Wait a time in seconds + + Parameters: + "seconds" as float : Time to wait in seconds + + Example: + >>> tux.sys.wait(2.4) + """ + time.sleep(seconds) + + #-------------------------------------------------------------------------- + # Execute a shell command + #-------------------------------------------------------------------------- + def shell(self,command): + """ + Execute a shell command + + Parameters: + "command" as string : Shell command + + Example: + >>> tux.sys.shell('ls -al') + """ + os.system(command) + + #-------------------------------------------------------------------------- + # Execute a shell command in free mode + #-------------------------------------------------------------------------- + def shell_free(self,command): + """ + Execute a shell command in free mode + + Parameters: + "command" as string : Shell command + + Example: + >>> tux.sys.shell_free('ls -al') + """ + thread.start_new_thread(self.shell,(command,)) + #t=threading.Thread(target=self.shell,args=(command,)) + #t.setName('sys.shell') + #t.start() + #self.parent.daemon.free_thread_list.append(t) + + #-------------------------------------------------------------------------- + # Add a time event in the time event handler + #-------------------------------------------------------------------------- + def add_time_event(self,cmd_type,cmd,year,month,day,hour,minute,second): + """ + Add a time event in the time event handler + + Parameters: + "cmd_type" as number : Command type (CT_SHELL|CT_FUNCTION) + "cmd" as string : Command to execute + "year" as integer : (ex : 2006) (9999 : parameter ignored) + "month" as integer : (ex : 12) (99 : parameter ignored) + "day" as integer : (ex : 23) (99 : parameter ignored) + "hour" as integer : (ex : 08) (99 : parameter ignored) + "minute" as integer : (ex : 55) (99 : parameter ignored) + "second" as integer : (ex : 30) (99 : parameter ignored) + + Example: + >>> tux.sys.add_time_event(CT_SHELL,'xmms',9999,99,99,8,5,0) + """ + self.events_list.append([cmd_type,cmd,year,month,day,hour,minute,second]) + + #-------------------------------------------------------------------------- + # Clear the time events of the time event handler + #-------------------------------------------------------------------------- + def clear_time_events(self): + """ + Clear the time events of the time event handle + + Example: + >>> tux.sys.clear_time_events() + """ + self.events_list=[[0,'NONE',9999,99,99,99,99,99]] + + #-------------------------------------------------------------------------- + # Not a user function + #-------------------------------------------------------------------------- + def _loop_time_events(self): + """ + Not a user function + """ + def event_due(event): + now = time.localtime() + return event[2] in [now[0], 9999]\ + and event[3] in [now[1], 99]\ + and event[4] in [now[2], 99]\ + and event[5] in [now[3], 99]\ + and event[6] in [now[4], 99]\ + and event[7] in [now[5], 99] + + while self.parent.daemon.connected: + for event in self.events_list: + if event[0]!=0 and event_due(event): + if event[0]==CT_SHELL: + self.shell_free(event[1]) + elif event[0]==CT_FUNCTION: + event[1]() + time.sleep(1) + + #-------------------------------------------------------------------------- + # To execute a function with a delay + #-------------------------------------------------------------------------- + def delayed_function(self,function,delay): + """ + To execute a function with a delay + + Parameters: + "function" as pointer of function : function to execute + "delay" as float : time to wait before executing + the function. In seconds + + Example: + >>> def test(): + ... print "hello world" + ... + >>> tux.sys.delayed_function(test,10) + """ + def _funct(function,delay): + self.wait(delay) + function() + + t=threading.Thread(target=_funct,args=(function,delay,)) + t.setName('sys.delayed_function') + t.start() + self.parent.daemon.free_thread_list.append(t) + + #-------------------------------------------------------------------------- + # Looping on a function with a delay + #-------------------------------------------------------------------------- + def looped_function(self,function,delay): + """ + Looping on a function with a delay + + Parameters: + "function" as pointer of function: function to execute + "delay" as float : time to wait between 2 executions + of the function. In seconds + + Example: + >>> def test(): + ... print "hello world" + ... return True + ... + >>> tux.sys.looped_function(test,10) + + Comment: + While the return of the function is true, the loop remains + active + """ + def _funct(function,delay): + while not self.parent.exit_flag: + self.wait(delay) + if not function(): + return + t=threading.Thread(target=_funct,args=(function,delay,)) + t.setName('sys.temporized_function') + t.start() + self.parent.main_thread_list.append(t) + +#============================================================================== +# TUXTCPCommunicator - event - class +#============================================================================== +class TUXevent(object): + """ + Class which manages the events + + Global variables of this class: + "on_bt_pushed" as pof : On tux button pushed + "on_head_bt_pushed" as pof : On tux head button pushed + "on_lwing_bt_pushed" as pof : On tux left wing button pushed + "on_rwing_bt_pushed" as pof : On tux right wing button pushed + "on_bt_released" as pof : On tux button released + "on_head_bt_released" as pof : On tux head button released + "on_lwing_bt_released" as pof : On tux left wing button released + "on_rwing_bt_released" as pof : On tux right wing button released + "on_remote_bt" as tuple of pof : On remote controller button pressed + param 1 : Key as integer + "on_status" as pof : On status arrival + "on_remote" as pof : On remote controller event + "on_light_level" as pof : On light level event + param 1 : light value as integer + "on_connected" as pof : On api connected to tuxdaemon + "on_disconnected" as pof : On api disconnect from tuxdaemon + "on_mouth_open" as pof : On mouth open event + "on_mouth_close" as pof : On mouth close event + "on_power_plugged" as pof : On power plugged event + "on_power_unplugged" as pof : On power unplugged event + "on_left_blue_led_on" as pof : On left blue led changed to on + "on_left_blue_led_off" as pof : On left blue led changed to off + "on_right_blue_led_on" as pof : On right blue led changed to on + "on_right_blue_led_off" as pof : On right blue led changed to off + "on_eyes_open" as pof : On eyes open event + "on_eyes_close" as pof : On eyes close event + "on_rf_connected" as pof : On RF is connected + "on_rf_disconnected" as pof : On RF is disconnected + "on_pong_received" as pof : On pong status received + "on_mouth_stop" as pof : On mouth stop event + "on_eyes_stop" as pof : On eyes stop event + "on_wings_stop" as pof : On wings stop event + "on_spin_stop" as pof : On spin stop event + (pof = pointer of function) + + Example of associating a function to an event: + >>> def my_function(): + >>> print "hello" + >>> tux.event.on_bt_pushed=my_function + + Example of associating a function to a remote event: + >>> def my_function(key): + >>> print "Button %s is pressed"%remote_bt_name[key] + >>> tux.event.on_remote_bt=my_function + + Key constants of the remote controller: + (K_0,K_1,K_2,K_3,K_4,K_5,K_6,K_7,K_8,K_9,K_STANDBY, + K_MUTE,K_VOLUMEPLUS,K_VOLUMEMINUS,K_ESCAPE,K_YES, + K_NO,K_BACKSPACE,K_STARTVOIP,K_RECEIVECALL,K_HANGUP, + K_STAR,K_SHARP,K_RED,K_GREEN,K_BLUE,K_YELLOW, + K_CHANNELPLUS,K_CHANNELMINUS,K_UP,K_DOWN,K_LEFT, + K_RIGHT,K_OK,K_FASTREWIND,K_FASTFORWARD,K_PLAYPAUSE, + K_STOP,K_RECORDING,K_PREVIOUS,K_NEXT,K_MENU,K_MOUSE, + K_ALT) + + Functions list: + tux.event.clear + tux.event.remote_key_to_string + tux.event.restore + tux.event.store + tux.event.wait_bt_pushed + tux.event.wait_head_bt_pushed + tux.event.wait_head_bt_released + tux.event.wait_lwing_bt_pushed + tux.event.wait_lwing_bt_released + tux.event.wait_remote_bt + tux.event.wait_rwing_bt_pushed + tux.event.wait_rwing_bt_released + tux.event.wait_stable_status + tux.event.wait_status + """ + #-------------------------------------------------------------------------- + # Constructor of class + #-------------------------------------------------------------------------- + def __init__(self,parent): + """ + Constructor of class + """ + self.parent=parent + self.clear() + + #-------------------------------------------------------------------------- + # Get the string name of a remote key value + #-------------------------------------------------------------------------- + def remote_key_to_string(self,key): + """ + Get the string name of a remote key value + + Parameters: + "key" as integer : key to translate to string + + Return a string + + Example: + >>> print tux.event.remote_key_to_string(10) + """ + return remote_bt_name[key] + + #-------------------------------------------------------------------------- + # Clear all events + #-------------------------------------------------------------------------- + def clear(self): + """ + Clear all events + + Example: + >>> tux.event.clear() + """ + self.on_bt_pushed=None + self.on_head_bt_pushed=None + self.on_lwing_bt_pushed=None + self.on_rwing_bt_pushed=None + self.on_bt_released=None + self.on_head_bt_released=None + self.on_lwing_bt_released=None + self.on_rwing_bt_released=None + self.on_remote_bt=[None]*64 + self.on_status=None + self.on_remote=None + self.on_light_level=None + self.on_connected=None + self.on_disconnected=None + self.on_mouth_open=None + self.on_mouth_close=None + self.on_power_plugged=None + self.on_power_unplugged=None + self.on_left_blue_led_on=None + self.on_left_blue_led_off=None + self.on_right_blue_led_on=None + self.on_right_blue_led_off=None + self.on_eyes_open=None + self.on_eyes_close=None + self.on_rf_connected=None + self.on_rf_disconnected=None + self.on_pong_received=None + self.on_mouth_stop=None + self.on_eyes_stop=None + self.on_wings_stop=None + self.on_spin_stop=None + + #-------------------------------------------------------------------------- + # Store all events + #-------------------------------------------------------------------------- + def store(self): + """ + Store all events + + Example: + >>> tux.event.store() + """ + self.s_on_bt_pushed=self.on_bt_pushed + self.s_on_head_bt_pushed=self.on_head_bt_pushed + self.s_on_lwing_bt_pushed=self.on_lwing_bt_pushed + self.s_on_rwing_bt_pushed=self.on_rwing_bt_pushed + self.s_on_bt_released=self.on_bt_released + self.s_on_head_bt_released=self.on_head_bt_released + self.s_on_lwing_bt_released=self.on_lwing_bt_released + self.s_on_rwing_bt_released=self.on_rwing_bt_released + self.s_on_remote_bt=self.on_remote_bt + self.s_on_status=self.on_status + self.s_on_remote=self.on_remote + self.s_on_light_level=self.on_light_level + self.s_on_connected=self.on_connected + self.s_on_disconnected=self.on_disconnected + self.s_on_mouth_open=self.on_mouth_open + self.s_on_mouth_close=self.on_mouth_close + self.s_on_power_plugged=self.on_power_plugged + self.s_on_power_unplugged=self.on_power_unplugged + self.s_on_left_blue_led_on=self.on_left_blue_led_on + self.s_on_left_blue_led_off=self.on_left_blue_led_off + self.s_on_right_blue_led_on=self.on_right_blue_led_on + self.s_on_right_blue_led_off=self.on_right_blue_led_off + self.s_on_eyes_open=self.on_eyes_open + self.s_on_eyes_close=self.on_eyes_close + self.s_on_rf_connected=self.on_rf_connected + self.s_on_rf_disconnected=self.on_rf_disconnected + self.s_on_pong_received=self.on_pong_received + self.s_on_mouth_stop=on_mouth_stop + self.s_on_eyes_stop=on_eyes_stop + self.s_on_wings_stop=on_wings_stop + self.s_on_spin_stop=on_spin_stop + + #-------------------------------------------------------------------------- + # Restore all events + #-------------------------------------------------------------------------- + def restore(self): + """ + Restore all events + + Example: + >>> tux.event.restore() + """ + self.on_bt_pushed=self.s_on_bt_pushed + self.on_head_bt_pushed=self.s_on_head_bt_pushed + self.on_lwing_bt_pushed=self.s_on_lwing_bt_pushed + self.on_rwing_bt_pushed=self.s_on_rwing_bt_pushed + self.on_bt_released=self.s_on_bt_released + self.on_head_bt_released=self.s_on_head_bt_released + self.on_lwing_bt_released=self.s_on_lwing_bt_released + self.on_rwing_bt_released=self.s_on_rwing_bt_released + self.on_remote_bt=self.s_on_remote_bt + self.on_status=self.s_on_status + self.on_remote=self.s_on_remote + self.on_light_level=self.s_on_light_level + self.on_connected=self.s_on_connected + self.on_disconnected=self.s_on_disconnected + self.on_mouth_open=self.s_on_mouth_open + self.on_mouth_close=self.s_on_mouth_close + self.on_power_plugged=self.s_on_power_plugged + self.on_power_unplugged=self.s_on_power_unplugged + self.on_left_blue_led_on=self.s_on_left_blue_led_on + self.on_left_blue_led_off=self.s_on_left_blue_led_off + self.on_right_blue_led_on=self.s_on_right_blue_led_on + self.on_right_blue_led_off=self.s_on_right_blue_led_off + self.on_eyes_open=self.s_on_eyes_open + self.on_eyes_close=self.s_on_eyes_close + self.on_rf_connected=self.s_on_rf_connected + self.on_rf_disconnected=self.s_on_rf_disconnected + self.on_pong_received=self.s_on_pong_received + self.on_mouth_stop=s_on_mouth_stop + self.on_eyes_stop=s_on_eyes_stop + self.on_wings_stop=s_on_wings_stop + self.on_spin_stop=s_on_spin_stop + + #-------------------------------------------------------------------------- + # Wait until the specified status arrives + #-------------------------------------------------------------------------- + def wait_status(self,DATA_STATUS,DATA_VALUE,time_out): + """ + Wait until the specified status arrives + + Parameters: + "DATA_STATUS" as integer : Desired status + "DATA_VALUE" as integer : Desired value + "time_out" as integer : Time-out in seconds + (9999 = infinite wait) + + Return a boolean + + Example: + >>> var=tux.event.wait_status(DATAS_STATUS_HEAD_PUSH_SWITCH,1,2) + (see 'tuxapi_const.py' for the complete list of statuses) + """ + if not self.parent.daemon.connected: + return False + time_beginin=self.parent.sys.time() + while True: + if self.parent.daemon.connected: + self.parent.tcp_mutex.acquire() + data=self.parent.tcp_data + if self.parent.current_data_id==self.parent.last_threated_data_id: + self.parent.tcp_mutex.release() + time.sleep(0.001) + if (self.parent.sys.time()-time_beginin)>=time_out and time_out!=9999: + return False + continue + self.parent.last_threated_data_id=self.parent.current_data_id + self.parent.tcp_mutex.release() + if ord(data[0])==SOURCE_TUX: + if ord(data[1])==SS_DEFAULT: + if ord(data[2])==DATA_TP_RSP: + if ord(data[3])==SUBDATA_TP_STATUS: + if ord(data[4])==DATA_STATUS: + if ord(data[5])==DATA_VALUE: + return True + if (self.parent.sys.time()-time_beginin)>=time_out and time_out!=9999: + return False + time.sleep(0.001) + + #-------------------------------------------------------------------------- + # Wait for stable status + #-------------------------------------------------------------------------- + def wait_stable_status(self,DATA_STATUS,DATA_VALUE,stable_time,time_out): + """ + Wait for stable status + + Parameters: + "DATA_STATUS" as integer : Desired status + "DATA_VALUE" as integer : Desired value + "time_out" as integer : Time-out in seconds + (9999 = infinite wait) + "stable_out" as integer : Time of stable status request in seconds + + Return a boolean + + Example: + >>> var=tux.event.wait_status(DATAS_STATUS_HEAD_PUSH_SWITCH,1,5,2) + """ + if self.wait_status(DATA_STATUS,DATA_VALUE,time_out)==False: + return False + else: + if DATA_VALUE==1: + INV_DATA_VALUE=0 + else: + INV_DATA_VALUE=1 + return not self.wait_status(DATA_STATUS,INV_DATA_VALUE,stable_time) + + #-------------------------------------------------------------------------- + # Wait until a tux button is pushed + #-------------------------------------------------------------------------- + def wait_bt_pushed(self,time_out): + """ + Wait until a tux button is pushed + + Parameters: + "time_out" as integer : Time-out in seconds + (9999 = infinite wait) + + Return the button value as integer + (HEAD_BT|LEFT_WING_BT|RIGHT_WING_BT|NONE_BT) + + Example: + >>> tux.event.wait_bt_pushed(10) + """ + if not self.parent.daemon.connected: + return 0 + time_beginin=self.parent.sys.time() + while True: + if self.parent.daemon.connected: + self.parent.tcp_mutex.acquire() + data=self.parent.tcp_data + if self.parent.current_data_id==self.parent.last_threated_data_id: + self.parent.tcp_mutex.release() + time.sleep(0.001) + if (self.parent.sys.time()-time_beginin)>=time_out and time_out!=9999: + return 0 + continue + self.parent.last_threated_data_id=self.parent.current_data_id + self.parent.tcp_mutex.release() + if ord(data[0])==SOURCE_TUX: + if ord(data[1])==SS_DEFAULT: + if ord(data[2])==DATA_TP_RSP: + if ord(data[3])==SUBDATA_TP_STATUS: + if ord(data[4])==DATAS_STATUS_HEAD_PUSH_SWITCH: + if ord(data[5])==1: + return HEAD_BT + elif ord(data[4])==DATAS_STATUS_LEFT_WING_PUSH: + if ord(data[5])==1: + return LEFT_WING_BT + elif ord(data[4])==DATAS_STATUS_RIGHT_WING_PUSH: + if ord(data[5])==1: + return RIGHT_WING_BT + if (self.parent.sys.time()-time_beginin)>=time_out and time_out!=9999: + return 0 + time.sleep(0.001) + + #-------------------------------------------------------------------------- + # Wait until head button is pushed + #-------------------------------------------------------------------------- + def wait_head_bt_pushed(self,time_out): + """ + Wait until head button is pushed + + Parameters: + "time_out" as integer : Time-out in seconds + (9999 = infinite wait) + + Return a boolean + + Example: + >>> tux.event.wait_head_bt_pushed(2) + """ + return self.wait_status(DATAS_STATUS_HEAD_PUSH_SWITCH,1,time_out) + + #-------------------------------------------------------------------------- + # Wait until head button is released + #-------------------------------------------------------------------------- + def wait_head_bt_released(self): + """ + Wait until head button is released + + Parameters: + "time_out" as integer : Time-out in seconds + (9999 = infinite wait) + + Return a boolean + + Example: + >>> tux.event.wait_head_bt_released(2) + """ + return self.wait_status(DATAS_STATUS_HEAD_PUSH_SWITCH,0,9999) + + #-------------------------------------------------------------------------- + # Wait until left wing is pushed + #-------------------------------------------------------------------------- + def wait_lwing_bt_pushed(self,time_out): + """ + Wait until left wing is pushed + + Parameters: + "time_out" as integer : Time-out in seconds + (9999 = infinite wait) + + Return a boolean + + Example: + >>> tux.event.wait_lwing_bt_pushed(2) + """ + return self.wait_status(DATAS_STATUS_LEFT_WING_PUSH,1,time_out) + + #-------------------------------------------------------------------------- + # Wait until left wing is released + #-------------------------------------------------------------------------- + def wait_lwing_bt_released(self): + """ + Wait until left wing is released + + Parameters: + "time_out" as integer : Time-out in seconds + (9999 = infinite wait) + + Return a boolean + + Example: + >>> tux.event.wait_lwing_bt_released(2) + """ + return self.wait_status(DATAS_STATUS_LEFT_WING_PUSH,0,9999) + + #-------------------------------------------------------------------------- + # Wait until right wing is pushed + #-------------------------------------------------------------------------- + def wait_rwing_bt_pushed(self,time_out): + """ + Wait until right wing is pushed + + Parameters: + "time_out" as integer : Time-out in seconds + (9999 = infinite wait) + + Return a boolean + + Example: + >>> tux.event.wait_rwing_bt_pushed(2) + """ + return self.wait_status(DATAS_STATUS_RIGHT_WING_PUSH,1,time_out) + + #-------------------------------------------------------------------------- + # Wait until right wing is released + #-------------------------------------------------------------------------- + def wait_rwing_bt_released(self): + """ + Wait until right wing is released + + Parameters: + "time_out" as integer : Time-out in seconds + (9999 = infinite wait) + + Return a boolean + + Example: + >>> tux.event.wait_rwing_bt_released(2) + """ + return self.wait_status(DATAS_STATUS_RIGHT_WING_PUSH,0,9999) + + #-------------------------------------------------------------------------- + # Wait until a specified key of the remote is pressed + #-------------------------------------------------------------------------- + def wait_remote_bt(self,remote_key,timeout): + """ + Wait until a specified key of the remote is pressed + + Parameters: + "time_out" as integer : Time-out in seconds + (9999 = infinite wait) + + Return a boolean + + Example: + >>> tux.event.wait_remote_bt(K_OK,2) + """ + return self.wait_status(DATAS_STATUS_IR_CODE,remote_key,timeout) + + + #-------------------------------------------------------------------------- + # Deprecated functions + #-------------------------------------------------------------------------- + def set_on_bt_pushed(self,function): + """ + Deprecated + """ + self.on_bt_pushed=function + def set_on_head_bt_pushed(self,function): + """ + Deprecated + """ + self.on_head_bt_pushed=function + def set_on_lwing_bt_pushed(self,function): + """ + Deprecated + """ + self.on_lwing_bt_pushed=function + def set_on_rwing_bt_pushed(self,function): + """ + Deprecated + """ + self.on_rwing_bt_pushed=function + def set_on_bt_released(self,function): + """ + Deprecated + """ + self.on_bt_released=function + def set_on_head_bt_released(self,function): + """ + Deprecated + """ + self.on_head_bt_released=function + def set_on_lwing_bt_released(self,function): + """ + Deprecated + """ + self.on_lwing_bt_released=function + def set_on_rwing_bt_released(self,function): + """ + Deprecated + """ + self.on_rwing_bt_released=function + def set_on_remote_bt(self,key,function): + """ + Deprecated + """ + self.on_remote_bt[key]=function + +#============================================================================== +# TUXTCPCommunicator - cmd - class +#============================================================================== +class TUXcmd(object): + """ + Class which manages the tux commands + + Global variables of this class: + "last_ack" as integer : ACK value of the last command + (ACK_CMD_DONGLE_NOT_PRESENT|ACK_CMD_TIMEOUT| + ACK_CMD_OK|ACK_CMD_KO|ACK_CMD_ERROR) + "no_ack" as boolean : Allow to wait a ACK from tuxdaemon + + Functions list for the users: + tux.cmd.audio_channel_general + tux.cmd.audio_channel_tts + tux.cmd.eyes_close + tux.cmd.eyes_off + tux.cmd.eyes_on + tux.cmd.eyes_on_free + tux.cmd.eyes_open + tux.cmd.ir_off + tux.cmd.ir_on + tux.cmd.ir_send + tux.cmd.ledl_during + tux.cmd.ledl_during_free + tux.cmd.ledl_off + tux.cmd.ledl_on + tux.cmd.ledr_during + tux.cmd.ledr_during_free + tux.cmd.ledr_off + tux.cmd.ledr_on + tux.cmd.leds_blink + tux.cmd.leds_during + tux.cmd.leds_during_free + tux.cmd.leds_off + tux.cmd.leds_on + tux.cmd.mouth_close + tux.cmd.mouth_off + tux.cmd.mouth_on + tux.cmd.mouth_on_free + tux.cmd.mouth_open + tux.cmd.ping + tux.cmd.raw + tux.cmd.sleep_off + tux.cmd.sleep_on + tux.cmd.sound_play + tux.cmd.sound_store_index + tux.cmd.sound_storing + tux.cmd.sound_test + tux.cmd.spinl_off + tux.cmd.spinl_on + tux.cmd.spinl_on_free + tux.cmd.spinr_off + tux.cmd.spinr_on + tux.cmd.spinr_on_free + tux.cmd.structured + tux.cmd.wings_off + tux.cmd.wings_on + tux.cmd.wings_on_free + """ + + #-------------------------------------------------------------------------- + # Constructor of class + #-------------------------------------------------------------------------- + def __init__(self,parent): + """ + Constructor of class + """ + self.parent=parent + self.last_ack=ACK_CMD_OK + self.no_ack=False + + #-------------------------------------------------------------------------- + # SYSTEM + #-------------------------------------------------------------------------- + def cmd_ack(self): + """ + Not a user function + """ + time_beginin=self.parent.sys.time() + while True: + if self.parent.daemon.connected: + self.parent.tcp_mutex.acquire() + data=self.parent.tcp_data + if self.parent.current_data_id==self.parent.last_threated_data_id: + self.parent.tcp_mutex.release() + time.sleep(0.001) + if (self.parent.sys.time()-time_beginin)>=2: + return ACK_CMD_TIMEOUT + continue + self.parent.last_threated_data_id=self.parent.current_data_id + self.parent.tcp_mutex.release() + if ord(data[0])==SOURCE_TUX: + if ord(data[1])==SS_DEFAULT: + if ord(data[2])==DATA_TP_ACK_CMD: + result=ord(data[4]) + return result + if (self.parent.sys.time()-time_beginin)>=2: + return ACK_CMD_TIMEOUT + time.sleep(0.001) + + #-------------------------------------------------------------------------- + # Send a structured command to tux + #-------------------------------------------------------------------------- + def structured(self,fct,cmd,param1,param2,param3): + """ + Send a structured command to tux + + Parameters: + "fct" as integer : function of tux + "cmd" as integer : command for this function + "param1" as integer : parameter 1 for this command + "param2" as integer : parameter 2 for this command + "param3" as integer : parameter 3 for this command + + Return a ACK as integer + + Example: + >>> tux.cmd.structured(TUX_CMD_STRUCT_EYES,TUX_CMD_STRUCT_SUB_ON + ,count,0,0) + """ + if not self.parent.daemon.connected: + return 2 + data=(DEST_TUX,SD_DEFAULT,DATA_TP_CMD,SUBDATA_TP_STRUCT\ + ,fct,cmd,param1,param2,param3,0,0,0,0,0,0,0) + self.parent.sock.send("".join( [chr(x) for x in data] )) + if self.no_ack: + return 2 + else: + return self.cmd_ack() + + #-------------------------------------------------------------------------- + # Send a raw command to tux + #-------------------------------------------------------------------------- + def raw(self,cmd,param1,param2,param3): + """ + Send a raw command to tux + + Parameters: + "cmd" as integer : command + "param1" as integer : parameter 1 of these command + "param2" as integer : parameter 2 of these command + "param3" as integer : parameter 3 of these command + + Return a ACK if it's required + + Example: + >>> tux.cmd.raw(BLINK_EYES_CMD,4,0,0) + """ + if not self.parent.daemon.connected: + return 2 + data=(DEST_TUX,SD_DEFAULT,DATA_TP_CMD,SUBDATA_TP_RAW,\ + cmd,param1,param2,param3,0,0,0,0,0,0,0,0) + self.parent.sock.send("".join( [chr(x) for x in data] )) + if self.no_ack: + return 2 + else: + return self.cmd_ack() + + #-------------------------------------------------------------------------- + # Send a command to tux for moving the eyes + #-------------------------------------------------------------------------- + def eyes_on(self,count=1): + """ + Send a command to tux for moving the eyes + + Parameters: + "count" as integer : Number of movements + (default = 1) + + Example: + >>> tux.cmd.eyes_on() + >>> tux.cmd.eyes_on(2) + """ + self.last_ack=self.structured(TUX_CMD_STRUCT_EYES,TUX_CMD_STRUCT_SUB_ON,count,0,0) + self.parent.event.wait_status(DATAS_STATUS_EYES_POSITION_COUNTER,0,(0.3*count)) + + #-------------------------------------------------------------------------- + # Send a command to tux for opening the eyes + #-------------------------------------------------------------------------- + def eyes_open(self): + """ + Send a command to tux for opening the eyes + + Example: + >>> tux.cmd.eyes_open() + """ + if self.parent.status.get_eyes_closed_position_switch()==0: + self.eyes_on() + + #-------------------------------------------------------------------------- + # Send a command to tux for closing the eyes + #-------------------------------------------------------------------------- + def eyes_close(self): + """ + Send a command to tux for closing the eyes + + Example: + >>> tux.cmd.eyes_close() + """ + if self.parent.status.get_eyes_closed_position_switch()==1: + self.eyes_on() + + #-------------------------------------------------------------------------- + # Send a command to tux for moving the eyes in free mode + #-------------------------------------------------------------------------- + def eyes_on_free(self,count=1): + """ + Send a command to tux for moving the eyes in free mode + + Parameters: + "count" as integer : number of movements + (default = 1) + + Example: + >>> tux.cmd.eyes_on_free() + >>> tux.cmd.eyes_on_free(2) + """ + t=threading.Thread(target=self.eyes_on,args=(count,)) + t.setName('cmd.eyes_on') + t.start() + self.parent.daemon.free_thread_list.append(t) + + #-------------------------------------------------------------------------- + # Send a command to tux for stopping the eyes movement + #-------------------------------------------------------------------------- + def eyes_off(self): + """ + Send a command to tux for stopping the eyes movement + + Example: + >>> tux.cmd.eyes_off() + """ + self.last_ack=self.structured(TUX_CMD_STRUCT_EYES,TUX_CMD_STRUCT_SUB_OFF,0,0,0) + + #-------------------------------------------------------------------------- + # Send a command to tux for moving the mouth + #-------------------------------------------------------------------------- + def mouth_on(self,count=1): + """ + Send a command to tux for moving the mouth + + Parameters: + "count" as integer : number of movements + (default = 1) + + Example: + >>> tux.cmd.mouth_on() + >>> tux.cmd.mouth_on(2) + """ + self.last_ack=self.structured(TUX_CMD_STRUCT_MOUTH,TUX_CMD_STRUCT_SUB_ON,count,0,0) + self.parent.event.wait_status(DATAS_STATUS_MOUTH_POSITION_COUNTER,0,(0.3*count)) + + #-------------------------------------------------------------------------- + # Send a command to tux for moving the mouth in free mode + #-------------------------------------------------------------------------- + def mouth_on_free(self,count=1): + """ + Send a command to tux for moving the mouth in free mode + + Parameters: + "count" as integer : number of movements + (default = 1) + + Example: + >>> tux.cmd.mouth_on_free() + >>> tux.cmd.mouth_on_free(2) + """ + t=threading.Thread(target=self.mouth_on,args=(count,)) + t.setName('cmd.mouth_on') + t.start() + self.parent.daemon.free_thread_list.append(t) + + #-------------------------------------------------------------------------- + # Send a command to tux for opening the mouth + #-------------------------------------------------------------------------- + def mouth_open(self): + """ + Send a command to tux for opening the mouth + + Example: + >>> tux.cmd.mouth_open() + """ + if self.parent.status.get_mouth_open_position()==1: + self.mouth_on() + + #-------------------------------------------------------------------------- + # Send a command to tux for closing the mouth + #-------------------------------------------------------------------------- + def mouth_close(self): + """ + Send a command to tux for closing the mouth + + Example: + >>> tux.cmd.mouth_close() + """ + if self.parent.status.get_mouth_open_position()==0: + self.mouth_on() + + #-------------------------------------------------------------------------- + # Send a command to tux for stopping the mouth movement + #-------------------------------------------------------------------------- + def mouth_off(self): + """ + Send a command to tux for stopping the mouth movement + + Example: + >>> tux.cmd.mouth_off() + """ + self.last_ack=self.structured(TUX_CMD_STRUCT_MOUTH,TUX_CMD_STRUCT_SUB_OFF,0,0,0) + + #-------------------------------------------------------------------------- + # Send a command to tux for moving the wings + #-------------------------------------------------------------------------- + def wings_on(self,count=1,speed=5): + """ + Send a command to tux for moving the wings + + Parameters: + "count" as integer : number of movements + (default = 1) + "speed" as integer : speed of the movement(1-5) + (default = 5) + + Example: + >>> tux.cmd.wings_on() + >>> tux.cmd.wings_on(2) (2 movements) + >>> tux.cmd.wings_on(2,5) (2 movements and speed=5) + """ + self.last_ack=self.structured(TUX_CMD_STRUCT_WINGS,\ + TUX_CMD_STRUCT_SUB_ON,count,speed,0) + self.parent.event.wait_status(DATAS_STATUS_WINGS_POSITION_COUNTER\ + ,0,(0.6*count)) + + #-------------------------------------------------------------------------- + # Send a command to tux for moving the wings in free mode + #-------------------------------------------------------------------------- + def wings_on_free(self,count=1,speed=5): + """ + Send a command to tux for moving the wings in free mode + + Parameters: + "count" as integer : number of movem... [truncated message content] |