local.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. import serial
  2. import time
  3. import usb
  4. from alphasign.interfaces import base
  5. class Serial(base.BaseInterface):
  6. """Connect to a sign through a local serial device.
  7. This class uses `pySerial <http://pyserial.sourceforge.net/>`_.
  8. """
  9. def __init__(self, device="/dev/ttyS0"):
  10. """
  11. :param device: character device (default: /dev/ttyS0)
  12. :type device: string
  13. """
  14. self.device = device
  15. self.debug = True
  16. self._conn = None
  17. def connect(self):
  18. """Establish connection to the device.
  19. """
  20. # TODO(ms): these settings can probably be tweaked and still support most of
  21. # the devices.
  22. self._conn = serial.Serial(port=self.device,
  23. baudrate=4800,
  24. parity=serial.PARITY_EVEN,
  25. stopbits=serial.STOPBITS_TWO,
  26. timeout=1,
  27. xonxoff=0,
  28. rtscts=0)
  29. def disconnect(self):
  30. """Disconnect from the device.
  31. """
  32. if self._conn:
  33. self._conn.close()
  34. def write(self, packet):
  35. """Write packet to the serial interface.
  36. :param packet: packet to write
  37. :type packet: :class:`alphasign.packet.Packet`
  38. """
  39. if not self._conn or not self._conn.isOpen():
  40. self.connect()
  41. if self.debug:
  42. print "Writing packet: %s" % repr(packet)
  43. try:
  44. self._conn.write(str(packet))
  45. except OSError:
  46. return False
  47. else:
  48. return True
  49. class USB(base.BaseInterface):
  50. """Connect to a sign using USB.
  51. This class uses `PyUSB <http://pyusb.berlios.de>`_.
  52. """
  53. def __init__(self, usb_id):
  54. """
  55. :param usb_id: tuple of (vendor id, product id) identifying the USB device
  56. """
  57. self.vendor_id, self.product_id = usb_id
  58. self.debug = False
  59. self._handle = None
  60. self._conn = None
  61. def _get_device(self):
  62. for bus in usb.busses():
  63. for device in bus.devices:
  64. if (device.idVendor == self.vendor_id and
  65. device.idProduct == self.product_id):
  66. return device
  67. return None
  68. def connect(self, reset=True):
  69. """
  70. :param reset: send a USB RESET command to the sign.
  71. This seems to cause problems in VMware.
  72. :exception usb.USBError: on USB-related errors
  73. """
  74. if self._conn:
  75. return
  76. device = self._get_device()
  77. if not device:
  78. raise usb.USBError, ("failed to find USB device %04x:%04x" %
  79. (self.vendor_id, self.product_id))
  80. interface = device.configurations[0].interfaces[0][0]
  81. self._read_endpoint, self._write_endpoint = interface.endpoints
  82. self._conn = device.open()
  83. if reset:
  84. self._conn.reset()
  85. self._conn.claimInterface(interface)
  86. def disconnect(self):
  87. """ """
  88. if self._conn:
  89. self._conn.releaseInterface()
  90. def write(self, packet):
  91. """ """
  92. if not self._conn:
  93. self.connect()
  94. if self.debug:
  95. print "Writing packet: %s" % repr(packet)
  96. written = self._conn.bulkWrite(self._write_endpoint.address, str(packet))
  97. if self.debug:
  98. print "%d bytes written" % written
  99. self._conn.bulkWrite(self._write_endpoint.address, '')
  100. class DebugInterface(base.BaseInterface):
  101. """Dummy interface used only for debugging.
  102. This does nothing except print the contents of written packets.
  103. """
  104. def __init__(self):
  105. self.debug = True
  106. def connect(self):
  107. """ """
  108. pass
  109. def disconnect(self):
  110. """ """
  111. pass
  112. def write(self, packet):
  113. """ """
  114. if self.debug:
  115. print "Writing packet: %s" % repr(packet)
  116. return True