local.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import serial
  2. import time
  3. import usb
  4. from alphasign.interfaces import base
  5. class Serial(base.BaseInterface):
  6. """Connect to a sign through a local serial device.
  7. This class uses `pySerial <http://pyserial.sourceforge.net/>`_.
  8. """
  9. def __init__(self, device="/dev/ttyS0"):
  10. """
  11. :param device: character device (default: /dev/ttyS0)
  12. :type device: string
  13. """
  14. self.device = device
  15. self.debug = True
  16. self._conn = None
  17. def connect(self):
  18. """Establish connection to the device.
  19. """
  20. # TODO(ms): these settings can probably be tweaked and still support most of
  21. # the devices.
  22. self._conn = serial.Serial(port=self.device,
  23. baudrate=4800,
  24. parity=serial.PARITY_EVEN,
  25. stopbits=serial.STOPBITS_TWO,
  26. timeout=1,
  27. xonxoff=0,
  28. rtscts=0)
  29. def disconnect(self):
  30. """Disconnect from the device.
  31. """
  32. if self._conn:
  33. self._conn.close()
  34. def write(self, packet):
  35. """Write packet to the serial interface.
  36. :param packet: packet to write
  37. :type packet: :class:`alphasign.packet.Packet`
  38. """
  39. if not self._conn or not self._conn.isOpen():
  40. self.connect()
  41. if self.debug:
  42. print "Writing packet: %s" % repr(packet)
  43. try:
  44. self._conn.write(str(packet))
  45. except OSError:
  46. return False
  47. else:
  48. return True
  49. class USB(base.BaseInterface):
  50. """Connect to a sign using USB.
  51. This class uses `PyUSB <http://pyusb.berlios.de>`_.
  52. """
  53. def __init__(self, usb_id):
  54. """
  55. :param usb_id: tuple of (vendor id, product id) identifying the USB device
  56. """
  57. self.vendor_id, self.product_id = usb_id
  58. self.debug = False
  59. self._handle = None
  60. self._conn = None
  61. def _get_device(self):
  62. for bus in usb.busses():
  63. for device in bus.devices:
  64. if (device.idVendor == self.vendor_id and
  65. device.idProduct == self.product_id):
  66. return device
  67. return None
  68. def connect(self):
  69. """
  70. TODO(ms): needs exception info
  71. """
  72. if self._conn:
  73. return
  74. device = self._get_device()
  75. if not device:
  76. raise usb.USBError, ("failed to find USB device %04x:%04x" %
  77. (self.vendor_id, self.product_id))
  78. interface = device.configurations[0].interfaces[0][0]
  79. self._read_endpoint, self._write_endpoint = interface.endpoints
  80. self._conn = device.open()
  81. self._conn.claimInterface(interface)
  82. def disconnect(self):
  83. if self._conn:
  84. self._conn.releaseInterface()
  85. def write(self, packet):
  86. if not self._conn:
  87. self.connect()
  88. if self.debug:
  89. print "Writing packet: %s" % repr(packet)
  90. self._conn.bulkWrite(self._write_endpoint.address, str(packet))
  91. self._conn.bulkWrite(self._write_endpoint.address, '')
  92. class DebugInterface(base.BaseInterface):
  93. """Dummy interface used only for debugging.
  94. This does nothing except print the contents of written packets.
  95. """
  96. def __init__(self):
  97. self.debug = True
  98. def connect(self):
  99. pass
  100. def disconnect(self):
  101. pass
  102. def write(self, packet):
  103. if self.debug:
  104. print "Writing packet: %s" % repr(packet)
  105. return True