local.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. import serial
  2. import time
  3. import usb
  4. from alphasign.interfaces import base
  5. class Serial(base.BaseInterface):
  6. """Connect to a sign through a local serial device.
  7. This class uses `pySerial <http://pyserial.sourceforge.net/>`_.
  8. """
  9. def __init__(self, device="/dev/ttyS0"):
  10. """
  11. :param device: character device (default: /dev/ttyS0)
  12. :type device: string
  13. """
  14. self.device = device
  15. self.debug = True
  16. self._conn = None
  17. def connect(self):
  18. """Establish connection to the device.
  19. """
  20. # TODO(ms): these settings can probably be tweaked and still support most of
  21. # the devices.
  22. self._conn = serial.Serial(port=self.device,
  23. baudrate=4800,
  24. parity=serial.PARITY_EVEN,
  25. stopbits=serial.STOPBITS_TWO,
  26. bytesize=serial.SEVENBITS,
  27. timeout=1,
  28. xonxoff=0,
  29. rtscts=0)
  30. def disconnect(self):
  31. """Disconnect from the device.
  32. """
  33. if self._conn:
  34. self._conn.close()
  35. def write(self, packet):
  36. """Write packet to the serial interface.
  37. :param packet: packet to write
  38. :type packet: :class:`alphasign.packet.Packet`
  39. """
  40. if not self._conn or not self._conn.isOpen():
  41. self.connect()
  42. if self.debug:
  43. print "Writing packet: %s" % repr(packet)
  44. try:
  45. self._conn.write(str(packet))
  46. except OSError:
  47. return False
  48. else:
  49. return True
  50. class USB(base.BaseInterface):
  51. """Connect to a sign using USB.
  52. This class uses `PyUSB <http://pyusb.berlios.de>`_.
  53. """
  54. def __init__(self, usb_id):
  55. """
  56. :param usb_id: tuple of (vendor id, product id) identifying the USB device
  57. """
  58. self.vendor_id, self.product_id = usb_id
  59. self.debug = False
  60. self._handle = None
  61. self._conn = None
  62. def _get_device(self):
  63. for bus in usb.busses():
  64. for device in bus.devices:
  65. if (device.idVendor == self.vendor_id and
  66. device.idProduct == self.product_id):
  67. return device
  68. return None
  69. def connect(self, reset=True):
  70. """
  71. :param reset: send a USB RESET command to the sign.
  72. This seems to cause problems in VMware.
  73. :exception usb.USBError: on USB-related errors
  74. """
  75. if self._conn:
  76. return
  77. device = self._get_device()
  78. if not device:
  79. raise usb.USBError, ("failed to find USB device %04x:%04x" %
  80. (self.vendor_id, self.product_id))
  81. interface = device.configurations[0].interfaces[0][0]
  82. self._read_endpoint, self._write_endpoint = interface.endpoints
  83. self._conn = device.open()
  84. if reset:
  85. self._conn.reset()
  86. self._conn.claimInterface(interface)
  87. def disconnect(self):
  88. """ """
  89. if self._conn:
  90. self._conn.releaseInterface()
  91. def write(self, packet):
  92. """ """
  93. if not self._conn:
  94. self.connect()
  95. if self.debug:
  96. print "Writing packet: %s" % repr(packet)
  97. written = self._conn.bulkWrite(self._write_endpoint.address, str(packet))
  98. if self.debug:
  99. print "%d bytes written" % written
  100. self._conn.bulkWrite(self._write_endpoint.address, '')
  101. class DebugInterface(base.BaseInterface):
  102. """Dummy interface used only for debugging.
  103. This does nothing except print the contents of written packets.
  104. """
  105. def __init__(self):
  106. self.debug = True
  107. def connect(self):
  108. """ """
  109. pass
  110. def disconnect(self):
  111. """ """
  112. pass
  113. def write(self, packet):
  114. """ """
  115. if self.debug:
  116. print "Writing packet: %s" % repr(packet)
  117. return True