discoverer

This commit is contained in:
Andrew Kay
2020-08-25 20:29:53 -05:00
parent baf7ed527c
commit a5da015d04
4 changed files with 297 additions and 0 deletions

69
discoverer/analyze.py Executable file
View File

@@ -0,0 +1,69 @@
#!/usr/bin/env python
import sys
from msgpack import Unpacker
from coax.protocol import Command, is_data_word, unpack_data_word
from display import CHAR_MAP
COMMANDS = { command.value: command.name for command in Command }
# These got removed from the most recent pycoax...
def is_command_word(word):
return (word & 0x1) == 1
def unpack_command_word(word):
if not is_command_word(word):
raise ProtocolError(f'Word does not have command bit set: {word}')
command = (word >> 2) & 0x1f
return command
def format_command(word):
if word in COMMANDS:
return COMMANDS[word]
return word
def main():
command = None
with open(sys.argv[1], 'rb') as capture_file:
unpacker = Unpacker(capture_file, raw=False)
for (timestamp, words, errors) in unpacker:
if errors:
print('E')
else:
line = ''
is_single_data_word = (len(words) == 1 and is_data_word(words[0]))
is_trta = (len(words) == 0 and words[0] == 0)
for word in words:
if is_command_word(word):
command = unpack_command_word(word)
if command in COMMANDS:
line += '<' + COMMANDS[command] + '> '
else:
line += f'<{word:04x}> '
elif is_data_word(word):
data = unpack_data_word(word)
if command == Command.WRITE_DATA.value and data in CHAR_MAP:
line += CHAR_MAP[data]
else:
line += f'{data:02x} '
else:
line += f'[{word:04x}] '
# Hide POLLs and TRTAs
if command != Command.POLL.value and not is_trta:
print(line)
if __name__ == '__main__':
main()

90
discoverer/capture.py Executable file
View File

@@ -0,0 +1,90 @@
#!/usr/bin/env python
import sys
from collections import namedtuple
import struct
from more_itertools import partition
from serial import Serial
from sliplib import ProtocolError
import msgpack
from coax.serial_interface import SlipSerial, _unpack_receive_data
Packet = namedtuple('Packet', ['timestamp', 'words', 'errors'])
class TapException(Exception):
pass
class NewCoaxTap:
def __init__(self, serial_port):
self.serial = Serial(serial_port, 115200)
self.slip_serial = SlipSerial(self.serial)
def enable(self):
self.serial.write(b'e')
self.serial.flush()
def read(self):
try:
message = self.slip_serial.recv_msg()
except ProtocolError:
raise TapException('SLIP protocol error')
if len(message) < 6:
raise TapException('Invalid message received, must be at least 6 bytes')
if len(message) % 2 != 0:
raise TapException('Invalid message received, must be even length')
timestamp = struct.unpack('<I', message[0:4])[0]
words = _unpack_receive_data(message[4:])
errors = []
data = []
for word in words:
if word & 0x8000:
errors.append(word & 0x7fff)
else:
data.append(word)
if errors:
sys.stdout.write('E')
return Packet(timestamp, data, errors)
tap = NewCoaxTap(sys.argv[1])
capture_file = open(sys.argv[2], 'wb')
# Start capture.
tap.enable()
# Process packets.
packet_count = 0
need_newline = False
while True:
try:
packet = tap.read()
except TapException as error:
if need_newline:
print()
print(f'ERROR: {error}')
need_newline = False
continue
capture_file.write(msgpack.packb(packet, use_bin_type=True))
if packet_count % 10 == 0:
sys.stdout.write('.')
sys.stdout.flush()
need_newline = True
packet_count += 1

132
discoverer/display.py Normal file
View File

@@ -0,0 +1,132 @@
# From oec...
_ASCII_CHAR_MAP = {
'>': 0x08,
'<': 0x09,
'[': 0x0a,
']': 0x0b,
')': 0x0c,
'(': 0x0d,
'}': 0x0e,
'{': 0x0f,
' ': 0x10,
'=': 0x11,
'\'': 0x12,
'"': 0x13,
'/': 0x14,
'\\': 0x15,
'|': 0x16,
'¦': 0x17,
'?': 0x18,
'!': 0x19,
'$': 0x1a,
'¢': 0x1b,
'£': 0x1c,
'¥': 0x1d,
# 0x1e - A P/T looking symbol
# 0x1f - A intertwined parens symbol
'0': 0x20,
'1': 0x21,
'2': 0x22,
'3': 0x23,
'4': 0x24,
'5': 0x25,
'6': 0x26,
'7': 0x27,
'8': 0x28,
'9': 0x29,
'ß': 0x2a,
'§': 0x2b,
'#': 0x2c,
'@': 0x2d,
'%': 0x2e,
'_': 0x2f,
'&': 0x30,
'-': 0x31,
'.': 0x32,
',': 0x33,
':': 0x34,
'+': 0x35,
'¬': 0x36,
'¯': 0x37, # ???
'°': 0x38,
# 0x39 - Accent?
'^': 0x3a, # More like an accent
'~': 0x3b, # More like an accent
'¨': 0x3c,
# 0x3d - Accute accent?
# 0x3e - Opposite of accute accent?
# 0x3f - A more extreme comma?
'a': 0x80,
'b': 0x81,
'c': 0x82,
'd': 0x83,
'e': 0x84,
'f': 0x85,
'g': 0x86,
'h': 0x87,
'i': 0x88,
'j': 0x89,
'k': 0x8a,
'l': 0x8b,
'm': 0x8c,
'n': 0x8d,
'o': 0x8e,
'p': 0x8f,
'q': 0x90,
'r': 0x91,
's': 0x92,
't': 0x93,
'u': 0x94,
'v': 0x95,
'w': 0x96,
'x': 0x97,
'y': 0x98,
'z': 0x99,
'æ': 0x9a,
'ø': 0x9b,
'å': 0x9c,
'ç': 0x9d,
# 0x9e - Semi colon with top line
# 0x9f - Asterisk with top line
'A': 0xa0,
'B': 0xa1,
'C': 0xa2,
'D': 0xa3,
'E': 0xa4,
'F': 0xa5,
'G': 0xa6,
'H': 0xa7,
'I': 0xa8,
'J': 0xa9,
'K': 0xaa,
'L': 0xab,
'M': 0xac,
'N': 0xad,
'O': 0xae,
'P': 0xaf,
'Q': 0xb0,
'R': 0xb1,
'S': 0xb2,
'T': 0xb3,
'U': 0xb4,
'V': 0xb5,
'W': 0xb6,
'X': 0xb7,
'Y': 0xb8,
'Z': 0xb9,
'Æ': 0xba,
'Ø': 0xbb,
'Å': 0xbc,
'Ç': 0xbd,
';': 0xbe,
'*': 0xbf
}
CHAR_MAP = { d: a for (a, d) in _ASCII_CHAR_MAP.items() }

View File

@@ -0,0 +1,6 @@
more-itertools==8.4.0
pycoax==0.5.0
pyserial==3.4
sliplib==0.5.0
sortedcontainers==2.1.0
wcwidth==0.1.7