[tuxdroid-svn] r644 - api/python/trunk
Status: Beta
Brought to you by:
ks156
From: remi <c2m...@c2...> - 2007-10-29 10:22:24
|
Author: remi Date: 2007-10-29 11:22:14 +0100 (Mon, 29 Oct 2007) New Revision: 644 Modified: api/python/trunk/tuxapi_class.py Log: ADD: Capture of the micro stream. Modified: api/python/trunk/tuxapi_class.py =================================================================== --- api/python/trunk/tuxapi_class.py 2007-10-26 10:05:00 UTC (rev 643) +++ api/python/trunk/tuxapi_class.py 2007-10-29 10:22:14 UTC (rev 644) @@ -52,41 +52,41 @@ self.__fifo_list = [] self.__list_mutex = threading.Lock() self.__threads_list = [] - + def destroy(self): for my_thread in self.__threads_list: if my_thread.isAlive(): my_thread.join() - + def connect(self, funct_ptr): self.__list_mutex.acquire() idx = len(self.__funct_ptr_list) self.__funct_ptr_list.append(funct_ptr) self.__list_mutex.release() return idx - + def push(self): self.__list_mutex.acquire() self.__fifo_list.append(self.__funct_ptr_list) self.__list_mutex.release() self.clear() - + def pop(self): if len(self.__fifo_list) > 0: self.__list_mutex.acquire() self.__funct_ptr_list = self.__fifo_list.pop() self.__list_mutex.release() - + def clear(self): self.__list_mutex.acquire() self.__funct_ptr_list = [] self.__list_mutex.release() - + def disconnect(self, idx): self.__list_mutex.acquire() self.__funct_ptr_list[idx] = None self.__list_mutex.release() - + def __run(self, funct_ptr, idx, fargs): try: funct_ptr(*fargs) @@ -94,7 +94,7 @@ self.__list_mutex.acquire() self.__funct_ptr_list[idx] == None self.__list_mutex.release() - + def notify(self, *fargs): self.__list_mutex.acquire() if len(self.__funct_ptr_list) == 0: @@ -186,9 +186,9 @@ self.monitoring = TUXmonitoring(self) self.wav = TUXwav(self) self.micro = TUXmicro(self) - + self.head_switch = TuxSwitch(self, STATUS_HEAD_PUSH_SWITCH) - + self.connected=False #deprecated t=threading.Thread(target=self.daemon._loop_auto_connect) t.setName('daemon._loop_auto_connect') @@ -541,7 +541,7 @@ self.event.on_status(data) #on monitoring self.monitoring.check_events(data) - + self.head_switch._notify(ord(data[4]), ord(data[5])) @@ -948,14 +948,14 @@ self.__time_event_mutex.acquire() self.events_list=[[0,'NONE',9999,99,99,99,99,99]] self.__time_event_mutex.release() - + #-------------------------------------------------------------------------- # Delete a time event from the time event handler #-------------------------------------------------------------------------- def delete_time_event(self, cmd_type,cmd,year,month,day,hour,minute,second): """ Delete a time event from the time event handler - + Parameters: "cmd_type" as number : Command type (CT_SHELL|CT_FUNCTION) "cmd" as string : Command to execute @@ -1070,14 +1070,14 @@ t.setName('sys.temporized_function') t.start() self.parent.main_thread_list.append(t) - + #============================================================================== # TUXTCPCommunicator - monitoring - class #============================================================================== class TUXmonitoring(object): """ Class which manages the monitoring - + Status constants list: (STATUS_WINGS_MOTOR_BACKWARD, STATUS_SPIN_MOTOR_BACKWARD, STATUS_SPIN_MOTOR_FORWARD, STATUS_MOUTH_OPEN_POSITION, @@ -1099,7 +1099,7 @@ STATUS_SOUND_COUNT, STATUS_PONG, STATUS_BATTERY, STATUS_MICRO_ENERGY, ) - + Functions list: tux.monitoring.insert tux.monitoring.remove @@ -1109,53 +1109,53 @@ self.parent = parent self._event_mutex = threading.Lock() self._event_list = [] - + def insert(self, status, function): """ Insert a monitoring event. - + Parameters: "status" as integer : Status index (See status contants list) "function" as pointer of function : Function to bind - + Returns: The index of your event in the monitoring manager. You must save this index if you want removing your event. - + Exemple: - >>> monitor_idx = tux.monitoring.insert(STATUS_LIGHT_LEVEL, my_function) + >>> monitor_idx = tux.monitoring.insert(STATUS_LIGHT_LEVEL, my_function) """ self._event_mutex.acquire() self._event_list.append([status, function]) self._event_mutex.release() return len(self._event_list) - 1 - + def remove(self, event_id): """ Remove a monitoring event. - + Parameters: "event_id" as integer : Index of the event. - + Exemple: >>> tux.monitoring.remove(monitor_idx) """ self._event_mutex.acquire() self._event_list[event_id][1] = None self._event_mutex.release() - + def check_events(self, frame): self._event_mutex.acquire() events = self._event_list self._event_mutex.release() - + def __load_funct_async(idx, funct, args): try: funct(args) except: self.remove(idx) - + for i, event in enumerate(events): if event[1] != None: if event[0] == ord(frame[4]): @@ -1165,8 +1165,8 @@ ord_frame.append(ord(val)) args = tuple(ord_frame) thread.start_new_thread(__load_funct_async, (i, event[1], args)) - + #============================================================================== # TUXTCPCommunicator - event - class #============================================================================== @@ -3365,12 +3365,12 @@ return 0 #-------------------------------------------------------------------------- - # Get the last state of sound status + # Get the last state of sound status #-------------------------------------------------------------------------- def flash_status(self): """ Get the last sound flash status - + Return a tupple with the audio flash status: (play state, record state, sound number) play state : Return the sound number or 0 if no sound is played @@ -4272,21 +4272,21 @@ self.voice_loaded = False self.speaking_stack = [] self.speaking_stack_mutex = threading.Lock() - + def destroy(self): self.on_wav_raw.destroy() - + def speaking_stack_add(self, speaking_conf): self.speaking_stack_mutex.acquire() self.speaking_stack.append(speaking_conf) self.speaking_stack_mutex.release() - + def speaking_stack_get_size(self): self.speaking_stack_mutex.acquire() val = len(self.speaking_stack) self.speaking_stack_mutex.release() return val - + def speaking_stack_loop(self): while self.connected: if self.speaking_stack_get_size() > 0: @@ -4320,7 +4320,7 @@ curr_speaking_conf[3].notify() curr_speaking_conf[3].release() self.speaking_stack_mutex.release() - + def speak(self, text): """ Speak a text with the acapela text to speech engine @@ -4346,7 +4346,7 @@ my_lock.acquire() my_lock.wait() my_lock.release() - + def speak_free(self, text): """ Speak a text with the acapela text to speech engine in free mode @@ -4372,7 +4372,7 @@ my_lock.acquire() my_lock.wait() my_lock.release() - + def _wav(self, wav_path, begin, end): if not os.path.isfile(wav_path): return @@ -4391,7 +4391,7 @@ my_lock.acquire() my_lock.wait() my_lock.release() - + def _wav_free(self, wav_path, begin, end): if not os.path.isfile(wav_path): return @@ -4519,17 +4519,17 @@ t.setName('tts._loop_tux_data_dispatching') t.start() self.con_thread_list.append(t) - + t=threading.Thread(target=self._loop_tcp_tts_msg) t.setName('tts._loop_tcp_tts_msg') t.start() self.con_thread_list.append(t) - + t=threading.Thread(target=self.speaking_stack_loop) t.setName('tts.speaking_stack_loop') t.start() self.con_thread_list.append(t) - + self.get_sound_state() return True @@ -4877,7 +4877,7 @@ self.parent.cmd.audio_channel_general() self.tts_mutex.release() return True - + #-------------------------------------------------------------------------- # Play a wav with tuxttsd #-------------------------------------------------------------------------- @@ -4936,7 +4936,7 @@ t.start() self.parent.daemon.free_thread_list.append(t) return True - + #-------------------------------------------------------------------------- # Play a wav with tuxttsd in free mode #-------------------------------------------------------------------------- @@ -5041,7 +5041,7 @@ >>> tux.tts.kill_daemon() """ self.send_command_to_tts(CMD_TYPE_DAEMON,CMD_DAEMON_KILL,0,0,0,0) - + #============================================================================== # TUXTCPCommunicator - wav - class #============================================================================== @@ -5065,7 +5065,7 @@ self.parent = parent self.__wav_path = "" self.__wav_length = 0 - + def __load(self, wav_path): if os.path.isfile(wav_path): try: @@ -5077,7 +5077,7 @@ return True else: return False - + def _set_begin_end(self, begin = 0., end = 0.): begin_100m = (int)(round(begin * 10)) end_100m = (int)(round(end * 10)) @@ -5086,23 +5086,23 @@ end_lower = end_100m & 0x000000FF end_upper = (end_100m & 0x0000FF00) >> 8 self.parent.tts.send_command_to_tts( - CMD_TYPE_WAV, + CMD_TYPE_WAV, CMD_WAV_BEGIN_END, begin_lower, begin_upper, end_lower, end_upper) - + def get_duration(self, wav_path): """ Get the duration of a wave file. - + Parameters: "wav_path" as string : Path of the wave file - + Returns: The time duration in seconds as float. - + Exemple: >>> print tux.wav.get_duration('/home/tux/test.wav') """ @@ -5111,78 +5111,78 @@ return self.__wav_length else: return 0. - + def play(self, wav_path, begin = 0., end = 0.): """ Play a wave file with tuxttsd daemon. - + Parameters: "wav_path" as string : Path of the wave file "begin" as float : Starting index in seconds(Optional) "end" as float : Ending index in seconds(Optional) - + Exemple: >>> tux.wav.play('/home/tux/test.wav') >>> tux.wav.play('/home/tux/test.wav', 0., 5.5) """ if self.__load(wav_path): self.parent.tts._wav(self.__wav_path, begin, end) - + def play_free(self, wav_path, begin = 0., end = 0.): """ Play a wave file with tuxttsd daemon in free mode. - + Parameters: "wav_path" as string : Path of the wave file "begin" as float : Starting index in seconds(Optional) "end" as float : Ending index in seconds(Optional) - + Exemple: >>> tux.wav.play_free('/home/tux/test.wav') >>> tux.wav.play_free('/home/tux/test.wav', 0., 5.5) """ if self.__load(wav_path): self.parent.tts._wav_free(self.__wav_path, begin, end) - + def stop(self): """ Stop the current played wave file. - + Exemple: >>> tux.wav.stop() """ self.parent.tts.stop() - + def pause(self): """ Pause the current played wave file. - + Exemple: >>> tux.wav.pause() """ self.parent.tts.pause() - + def _continue(self): """ Continue the playing of a paused wave file. - + Exemple: >>> tux.wav._continue() """ self.parent.tts.play() - + #============================================================================== # TUXTCPCommunicator - micro - class #============================================================================== class TUXmicro(object): """ Class which manages the microphone functions. - + Global variables of this class: "on_buffer" as EventControl : Event on new buffer from micro - + Example of associating a function to 'on_buffer' event: - + >>> def new_buffer(values): >>> ... print values >>> ... @@ -5192,23 +5192,99 @@ Functions list for the users: tux.micro.on tux.micro.off + tux.micro.capture_start + tux.micro.capture_stop """ - + def __init__(self,parent): self.parent = parent self.__state = False self.__process = None self.__process_mutex = threading.Lock() + self.__thread = None self.on_buffer = EventControl() - + self.on_capture_stop = EventControl() + self.on_capture_start = EventControl() + self.__capturing = False + self.__capture_path = "" + self.__capture_length = 0 + self.__capture_buffer = [] + self.__capture_idx = 0 + def destroy(self): self.off() self.on_buffer.destroy() - + if self.__thread != None: + if self.__thread.isAlive(): + self.__thread.join() + + def __on_capture_buffer(self, buffer): + if not self.__capturing: + return + for val in buffer: + self.__capture_buffer.append(val) + if self.__capture_length != 0: + if (self.__capture_length * 8000) <= len(self.__capture_buffer): + self.capture_stop() + return + + def capture_start(self, out_path, length = 0): + """ + Write the stream in a wav file. + + Parameters: + "out_path" as string : path of the wave file + "length" as float : duration of the capture in seconds + When this parameters is omitted, the length + is infinite. You must stop the recording + with 'tux.micro.capture_stop()' + + Examples: + >>> tux.micro.capture_start('/home/remi/Desktop/test.wav', 10.0) + >>> tux.micro.capture_start('/home/remi/Desktop/test.wav') + """ + if self.__capturing: + return False + if not self.__state: + return False + self.__capture_path = out_path + self.__capture_buffer = [] + self.__capture_length = length + self.__capturing = True + self.on_capture_start.notify() + self.__capture_idx = self.on_buffer.connect(self.__on_capture_buffer) + + def capture_stop(self): + """ + Stop the capture of the stream. + """ + if not self.__capturing: + return + self.__capturing = False + self.on_buffer.disconnect(self.__capture_idx) + time.sleep(0.2) + self.__write_capture() + self.on_capture_stop.notify() + + def __write_capture(self): + try: + import wave + except: + print "ERROR : import wave" + return + freqech = 8000 + sound_str = "" + for val in self.__capture_buffer: + sound_str += chr(val) + wavfile = wave.open(self.__capture_path, 'w') + wavfile.setparams((1, 1, freqech , len(self.__capture_buffer), 'NONE', 'not compressed')) + wavfile.writeframes(sound_str) + wavfile.close() + def on(self): """ Function to turn on the micro capture. - + Example: >>> tux.micro.on() """ @@ -5227,13 +5303,15 @@ self.__process_mutex.release() time.sleep(0.01) self.__state = False - - thread.start_new_thread(get_micro_data, ()) - + + self.__thread = threading.Thread(target=get_micro_data) + self.__thread.setName('micro.on') + self.__thread.start() + def off(self): """ Function to turn off the micro capture. - + Example: >>> tux.micro.off() """ @@ -5243,7 +5321,8 @@ os.kill(self.__process.pid, signal.SIGKILL) os.waitpid(-1, os.WNOHANG) self.__process_mutex.release() - + self.capture_stop() + def __on_new_buffer(self, buffer): i_buffer = [] for c in buffer: @@ -5251,11 +5330,11 @@ if self.__state: self.__send_energy_monitoring(i_buffer) self.on_buffer.notify(i_buffer) - + def __send_energy_monitoring(self, buffer): def log_value(value): return int(math.log(value) / math.log(255) * 255) - + try: e_total = 0 for val in buffer: @@ -5264,7 +5343,7 @@ if e_total > 255: e_total = 255 e_total = log_value(e_total) - + e_table = [] for i in range(4): e_m = 0 @@ -5279,7 +5358,7 @@ e_table.append(e_m) except: return - + tmp_tcp_data = [chr(0)] * 16 tmp_tcp_data[0] = chr(SOURCE_TUX) tmp_tcp_data[1] = chr(0) @@ -5446,14 +5525,14 @@ print "---------------------------------------------------------------" print "TUXDROID PYTHON API %s"%(api_version) print "---------------------------------------------------------------" - + def simulate_tcpip_frame(self, frame): """ To simulate the receiving of a frame Parameters: "frame" as list of 16 char : fake frame - + Example: >>> tux.misc.simulate_tcpip_frame(<list of 16 chars>) """ @@ -5484,7 +5563,7 @@ tmp_tcp_data[4] = chr(DATAS_STATUS_IR_CODE) tmp_tcp_data[5] = chr(key) self.simulate_tcpip_frame(tmp_tcp_data) - + #============================================================================== # TUXTCPCommunicator - switch - class #============================================================================== @@ -5495,29 +5574,29 @@ self.__status = status self.on_press = EventControl() self.on_release = EventControl() - + def _store_events(self): self.on_press.push() self.on_release.push() - + def _restore_events(self): self.on_press.pop() self.on_release.pop() - + def _clear_events(self): self.on_press.clear() self.on_release.clear() - + def _notify(self, status, value): if status == self.__status: if value == 0: self.on_release.notify() else: self.on_press.notify() - + def simulate(self, value): self._notify(self.__status, value) - + def get_status(self): return self.parent.status.get_one_status(self.__status) |