diff --git a/discoverer/analyze.py b/discoverer/analyze.py index b5163e1..df0191a 100755 --- a/discoverer/analyze.py +++ b/discoverer/analyze.py @@ -1,10 +1,11 @@ #!/usr/bin/env python import sys +from enum import Enum from msgpack import Unpacker -from coax.protocol import Command, is_data_word, unpack_data_word +from coax.protocol import Command, is_data_word, unpack_data_words from display import CHAR_MAP @@ -22,48 +23,93 @@ def unpack_command_word(word): return command -def format_command(word): - if word in COMMANDS: - return COMMANDS[word] +def format_command(command): + if command in COMMANDS: + return '<' + COMMANDS[command] + '>' - return word + return f'<{command:02x}>' + +printable_commands = set([Command.WRITE_DATA.value]) + +def format_data(data, command): + line = '' + + in_a_printable_stream = False + + for byte in data: + if command in printable_commands and byte in CHAR_MAP: + if not in_a_printable_stream: + line += '\'' + + line += CHAR_MAP[byte] + + in_a_printable_stream = True + else: + if in_a_printable_stream: + in_a_printable_stream = False + line += '\' ' + + line += '{0:02x} '.format(byte) + + if in_a_printable_stream: + line += '\'' + + return line.strip() + +class Party(Enum): + CONTROLLER = 1 + DEVICE = 2 + +class CaptureStream: + def __init__(self, stream): + self.unpacker = Unpacker(stream, raw=False) + + self.from_party = None + self.command = None + + def xxx(self): + (timestamp, words, errors) = self._read() + + if self.from_party == Party.CONTROLLER: + self.from_party = Party.DEVICE + elif self.from_party == Party.DEVICE: + self.from_party = Party.CONTROLLER + + if errors: + pass + elif is_command_word(words[0]) and all(is_data_word(word) for word in words[1:]): + if self.from_party != Party.CONTROLLER: + if self.from_party is not None: + print('something went out of sync') + + self.from_party = Party.CONTROLLER + + self.command = unpack_command_word(words[0]) + data = unpack_data_words(words[1:]) + + if self.command != Command.POLL.value: + print('-> ' + format_command(self.command) + ' ' + format_data(data, self.command)) + elif all(is_data_word(word) for word in words): + data = unpack_data_words(words) + + if not (self.command == Command.POLL.value and len(words) == 1 and words[0] == 0): + if self.command == Command.POLL.value: + print('do this missing POLL here') + + print('<- ' + format_data(data, self.command)) + else: + # Strange mix of commands and data... + pass + + def _read(self): + return self.unpacker.unpack() def main(): - command = None - with open(sys.argv[1], 'rb') as capture_file: - unpacker = Unpacker(capture_file, raw=False) + capture_stream = CaptureStream(capture_file) - for (timestamp, words, errors) in unpacker: - if errors: - print('E') - else: - line = '' - - is_single_data_word = (len(words) == 1 and is_data_word(words[0])) - is_trta = (len(words) == 0 and words[0] == 0) - - for word in words: - if is_command_word(word): - command = unpack_command_word(word) - - if command in COMMANDS: - line += '<' + COMMANDS[command] + '> ' - else: - line += f'<{word:04x}> ' - elif is_data_word(word): - data = unpack_data_word(word) - - if command == Command.WRITE_DATA.value and data in CHAR_MAP: - line += CHAR_MAP[data] - else: - line += f'{data:02x} ' - else: - line += f'[{word:04x}] ' - - # Hide POLLs and TRTAs - if command != Command.POLL.value and not is_trta: - print(line) + while True: + capture_stream.xxx() if __name__ == '__main__': main()