import serial import time import usb from alphasign.interfaces import base class Serial(base.BaseInterface): """Connect to a sign through a local serial device. This class uses `pySerial `_. """ def __init__(self, device="/dev/ttyS0"): """ :param device: character device (default: /dev/ttyS0) :type device: string """ self.device = device self.debug = True self._conn = None def connect(self): """Establish connection to the device. """ # TODO(ms): these settings can probably be tweaked and still support most of # the devices. self._conn = serial.Serial(port=self.device, baudrate=4800, parity=serial.PARITY_EVEN, stopbits=serial.STOPBITS_TWO, timeout=1, xonxoff=0, rtscts=0) def disconnect(self): """Disconnect from the device. """ if self._conn: self._conn.close() def write(self, packet): """Write packet to the serial interface. :param packet: packet to write :type packet: :class:`alphasign.packet.Packet` """ if not self._conn or not self._conn.isOpen(): self.connect() if self.debug: print "Writing packet: %s" % repr(packet) try: self._conn.write(str(packet)) except OSError: return False else: return True class USB(base.BaseInterface): """Connect to a sign using USB. This class uses `PyUSB `_. """ def __init__(self, usb_id): """ :param usb_id: tuple of (vendor id, product id) identifying the USB device """ self.vendor_id, self.product_id = usb_id self.debug = False self._handle = None self._conn = None def _get_device(self): for bus in usb.busses(): for device in bus.devices: if (device.idVendor == self.vendor_id and device.idProduct == self.product_id): return device return None def connect(self, reset=True): """ :param reset: send a USB RESET command to the sign. This seems to cause problems in VMware. :exception usb.USBError: on USB-related errors """ if self._conn: return device = self._get_device() if not device: raise usb.USBError, ("failed to find USB device %04x:%04x" % (self.vendor_id, self.product_id)) interface = device.configurations[0].interfaces[0][0] self._read_endpoint, self._write_endpoint = interface.endpoints self._conn = device.open() if reset: self._conn.reset() self._conn.claimInterface(interface) def disconnect(self): """ """ if self._conn: self._conn.releaseInterface() def write(self, packet): """ """ if not self._conn: self.connect() if self.debug: print "Writing packet: %s" % repr(packet) written = self._conn.bulkWrite(self._write_endpoint.address, str(packet)) if self.debug: print "%d bytes written" % written self._conn.bulkWrite(self._write_endpoint.address, '') class DebugInterface(base.BaseInterface): """Dummy interface used only for debugging. This does nothing except print the contents of written packets. """ def __init__(self): self.debug = True def connect(self): """ """ pass def disconnect(self): """ """ pass def write(self, packet): """ """ if self.debug: print "Writing packet: %s" % repr(packet) return True