[tuxdroid-svn] r792 - tux_lib/tux_usb/trunk
Status: Beta
Brought to you by:
ks156
From: remi <c2m...@c2...> - 2007-12-14 09:20:38
|
Author: remi Date: 2007-12-14 10:20:39 +0100 (Fri, 14 Dec 2007) New Revision: 792 Removed: tux_lib/tux_usb/trunk/test.py Log: RM: a bad file ... Deleted: tux_lib/tux_usb/trunk/test.py =================================================================== --- tux_lib/tux_usb/trunk/test.py 2007-12-14 09:19:09 UTC (rev 791) +++ tux_lib/tux_usb/trunk/test.py 2007-12-14 09:20:39 UTC (rev 792) @@ -1,296 +0,0 @@ -from ctypes import * -import time -import sys -import threading - -# ---------------------------------------------------------------------------- -# _TUX_USB.so library wrapping -# ---------------------------------------------------------------------------- -lib = CDLL("./_TUX_USB.so") - -usb_check_TuxDroid = lib.usb_check_TuxDroid -usb_check_TuxDroid.restype = c_uint - -usb_capture_TuxDroid = lib.usb_capture_TuxDroid -usb_capture_TuxDroid.restype = c_uint - -usb_write_TuxDroid = lib.usb_write_TuxDroid -usb_write_TuxDroid.restype = c_uint - -usb_read_TuxDroid = lib.usb_read_TuxDroid -usb_read_TuxDroid.restype = c_uint - -usb_close_TuxDroid = lib.usb_close_TuxDroid -usb_close_TuxDroid.restype = c_uint - -get_connected = lib.get_connected -get_connected.restype = c_uint - -set_connected = lib.set_connected -set_connected.restype = None - -library_init = lib.library_init -library_init.restype = None - -start = lib.start -start.restype = None - -stop = lib.stop -stop.restype = None - -FRAME_CALLBACK = CFUNCTYPE(None, POINTER(c_uint8)) -set_frame_callback = lib.set_frame_callback -set_frame_callback.restype = None -set_frame_callback.argtypes = [FRAME_CALLBACK] - -DONGLE_DISCONNECT_CALLBACK = CFUNCTYPE(None) -set_disconnect_dongle_callback = lib.set_disconnect_dongle_callback -set_disconnect_dongle_callback.restype = None -set_disconnect_dongle_callback.argtypes = [DONGLE_DISCONNECT_CALLBACK] - -def test_funct(test): - print test[3] - -def disconnected(): - print 'dongle disconnected' - -set_frame_callback(FRAME_CALLBACK(test_funct)) -set_disconnect_dongle_callback(DONGLE_DISCONNECT_CALLBACK(disconnected)) - -library_init() - -start() - -time.sleep(10) - -stop() -sys.exit(0) - -# ---------------------------------------------------------------------------- -# TUXUsbInterface constants -# ---------------------------------------------------------------------------- -CAPTURE_RETURN_NO_DONGLE = 0 -CAPTURE_RETURN_FIRMWARE_TO_OLD = 1 -CAPTURE_RETURN_DEVICE_NOT_HANDLED = 2 -CAPTURE_RETURN_SUCCESS = 3 -CLOSE_RETURN_INTERFACE_NOT_RELEASED = 0 -CLOSE_RETURN_DEVICE_NOT_CLOSED = 1 -CLOSE_RETURN_SUCCESS = 2 -DATA_WRITE_LENGTH = 5 -READ_LOOP_INTERVAL = 0.1 - -# ---------------------------------------------------------------------------- -# TUXUsbInterface class -# ---------------------------------------------------------------------------- -class TUXUsbInterface(object): - - def __init__(self): - self.__connected = False - self.__connected_mutex = threading.Lock() - self.__on_frame = None - self.__on_dongle_disconnect = None - self.__read_loop_thread = None - self.__debug = False - self.__read_write_mutex = threading.Lock() - self.__CallBack_mutex = threading.Lock() - self.__messages_mutex = threading.Lock() - - def destroy(self): - self.stop() - - def get_connected(self): - self.__connected_mutex.acquire() - connected = self.__connected - self.__connected_mutex.release() - return connected - - def __set_connected(self, value): - self.__connected_mutex.acquire() - self.__connected = value - self.__connected_mutex.release() - - def set_frame_CallBack(self, funct): - self.__CallBack_mutex.acquire() - self.__on_frame = funct - self.__CallBack_mutex.release() - - def __get_frame_CallBack(self): - self.__CallBack_mutex.acquire() - funct = self.__on_frame - self.__CallBack_mutex.release() - return funct - - def set_dongle_remove_CallBack(self, funct): - self.__CallBack_mutex.acquire() - self.__on_dongle_disconnect = funct - self.__CallBack_mutex.release() - - def __get_dongle_remove_CallBack(self): - self.__CallBack_mutex.acquire() - funct = self.__on_dongle_disconnect - self.__CallBack_mutex.release() - return funct - - def set_debug(self, value): - self.__debug = value - - def start(self): - if self.get_connected(): - return False - ret = usb_capture_TuxDroid() - if ret == CAPTURE_RETURN_NO_DONGLE: - self.__print_warning('Dongle not found.') - return False - elif ret == CAPTURE_RETURN_FIRMWARE_TO_OLD: - self.__print_warning('Your dongle firmware is too old.') - return False - elif ret == CAPTURE_RETURN_DEVICE_NOT_HANDLED: - self.__print_warning("Device can't be handled.") - return False - else: - self.__print_debug('The interface is started.') - self.__set_connected(True) - t = threading.Thread(target = self.__read_loop) - t.start() - self.__read_loop_thread = t - time.sleep(0.1) - return True - - def stop(self): - if not self.get_connected(): - return True - self.__set_connected(False) - if self.__read_loop_thread != None: - if self.__read_loop_thread.isAlive(): - self.__read_loop_thread.join() - ret = usb_close_TuxDroid() - if ret == CLOSE_RETURN_INTERFACE_NOT_RELEASED: - self.__print_warning("The interface can't be released.") - return False - elif ret == CLOSE_RETURN_DEVICE_NOT_CLOSED: - self.__print_warning("The device can't be closed.") - return False - else: - self.__print_debug('The interface is stopped.') - return True - - def dongle_present(self): - if usb_check_TuxDroid() == 0: - value = False - self.__connected_mutex.acquire() - self.__connected = value - self.__connected_mutex.release() - else: - value = True - return value - - def write(self, data = [0, 0x9A, 6, 25, 0]): - if not self.get_connected(): - return False - if len(data) != DATA_WRITE_LENGTH: - self.__print_warning("Write : Bad data length.") - return False - for val in data: - if str(type(val)) != "<type 'int'>": - self.__print_warning("Write : Bad data type.") - return False - t_char = tuple(data) - self.__read_write_mutex.acquire() - ret = usb_write_TuxDroid((c_uint8 * len(t_char))(*t_char), DATA_WRITE_LENGTH) - self.__read_write_mutex.release() - if ret != DATA_WRITE_LENGTH: - self.__print_warning("Write : Usb error.") - return False - else: - self.__print_debug('Write ok.') - return True - - def __write(self, data = [0, 0x9A, 6, 25, 0]): - if not self.get_connected(): - return False - if len(data) != DATA_WRITE_LENGTH: - self.__print_warning("Write : Bad data length.") - return False - for val in data: - if str(type(val)) != "<type 'int'>": - self.__print_warning("Write : Bad data type.") - return False - t_char = tuple(data) - ret = usb_write_TuxDroid((c_uint8 * len(t_char))(*t_char), DATA_WRITE_LENGTH) - if ret != DATA_WRITE_LENGTH: - self.__print_warning("Write : Usb error.") - return False - else: - self.__print_debug('Write ok.') - return True - - def __read(self): - t = [0] * 64 - t_char = tuple(t) - data = (c_uint8 * len(t_char))(*t_char) - if not self.get_connected(): - return t - self.__read_write_mutex.acquire() - if not self.__write([1, 1, 0, 0, 0]): - self.__read_write_mutex.release() - return t - ret = usb_read_TuxDroid(data) - self.__read_write_mutex.release() - res = [] - for i in range(64): - res.append(data[i]) - return res - - def __read_loop(self): - time.sleep(0.01) - check_counter = 0 - initial_timeout = time.time() - current_timeout = initial_timeout - while True: - current_timeout += READ_LOOP_INTERVAL - if not self.get_connected(): - self.__print_debug('End of read loop.') - break - if check_counter >= 9: - if not self.dongle_present(): - self.__print_debug('End of read loop.') - if self.__get_dongle_remove_CallBack() != None: - t = threading.Thread(target = self.__get_dongle_remove_CallBack()) - t.start() - break - check_counter = 0 - else: - check_counter += 1 - data = self.__read() - if self.__get_frame_CallBack() != None: - t = threading.Thread(target = self.__get_frame_CallBack(), args = (data, )) - t.start() - while time.time() < current_timeout: - time.sleep(0.005) - - def __print_debug(self, msg): - self.__messages_mutex.acquire() - if self.__debug: - print 'TUXUsbInterface [DEBUG] : %s' % msg - self.__messages_mutex.release() - - def __print_warning(self, msg): - self.__messages_mutex.acquire() - print 'TUXUsbInterface [WARNING] : %s' % msg - self.__messages_mutex.release() - - -def frame_CallBack(data): - print 'packet number :', data[3] - -def dongle_remove_CallBack(): - print 'Dongle is removed !' - -ti = TUXUsbInterface() -ti.set_frame_CallBack(frame_CallBack) -ti.set_dongle_remove_CallBack(dongle_remove_CallBack) -ti.set_debug(True) -ti.start() -time.sleep(5) -ti.write() -ti.stop() |