Files
2020-08-26 08:15:47 -05:00

116 lines
3.0 KiB
Python
Executable File

#!/usr/bin/env python
import sys
from enum import Enum
from msgpack import Unpacker
from coax.protocol import Command, is_data_word, unpack_data_words
from display import CHAR_MAP
COMMANDS = { command.value: command.name for command in Command }
# These got removed from the most recent pycoax...
def is_command_word(word):
return (word & 0x1) == 1
def unpack_command_word(word):
if not is_command_word(word):
raise ProtocolError(f'Word does not have command bit set: {word}')
command = (word >> 2) & 0x1f
return command
def format_command(command):
if command in COMMANDS:
return '<' + COMMANDS[command] + '>'
return f'<{command:02x}>'
printable_commands = set([Command.WRITE_DATA.value])
def format_data(data, command):
line = ''
in_a_printable_stream = False
for byte in data:
if command in printable_commands and byte in CHAR_MAP:
if not in_a_printable_stream:
line += '\''
line += CHAR_MAP[byte]
in_a_printable_stream = True
else:
if in_a_printable_stream:
in_a_printable_stream = False
line += '\' '
line += '{0:02x} '.format(byte)
if in_a_printable_stream:
line += '\''
return line.strip()
class Party(Enum):
CONTROLLER = 1
DEVICE = 2
class CaptureStream:
def __init__(self, stream):
self.unpacker = Unpacker(stream, raw=False)
self.from_party = None
self.command = None
def xxx(self):
(timestamp, words, errors) = self._read()
if self.from_party == Party.CONTROLLER:
self.from_party = Party.DEVICE
elif self.from_party == Party.DEVICE:
self.from_party = Party.CONTROLLER
if errors:
pass
elif is_command_word(words[0]) and all(is_data_word(word) for word in words[1:]):
if self.from_party != Party.CONTROLLER:
if self.from_party is not None:
print('something went out of sync')
self.from_party = Party.CONTROLLER
self.command = unpack_command_word(words[0])
data = unpack_data_words(words[1:])
if self.command != Command.POLL.value:
print('-> ' + format_command(self.command) + ' ' + format_data(data, self.command))
elif all(is_data_word(word) for word in words):
data = unpack_data_words(words)
if not (self.command == Command.POLL.value and len(words) == 1 and words[0] == 0):
if self.command == Command.POLL.value:
print('do this missing POLL here')
print('<- ' + format_data(data, self.command))
else:
# Strange mix of commands and data...
pass
def _read(self):
return self.unpacker.unpack()
def main():
with open(sys.argv[1], 'rb') as capture_file:
capture_stream = CaptureStream(capture_file)
while True:
capture_stream.xxx()
if __name__ == '__main__':
main()