New pycoax API

This commit is contained in:
Andrew Kay 2021-10-16 12:19:13 -05:00
parent f78af20a20
commit 9dfab54950
29 changed files with 1573 additions and 659 deletions

View File

@ -16,8 +16,8 @@ Assuming your interface is connected to `/dev/ttyACM0` and you have a CUT type t
```
import time
from coax import open_serial_interface, poll, poll_ack, load_address_counter_hi, \
load_address_counter_lo, write_data, ReceiveTimeout
from coax import open_serial_interface, Poll, PollAck, LoadAddressCounterHi, \
LoadAddressCounterLo, WriteData, ReceiveTimeout
with open_serial_interface('/dev/ttyACM0') as interface:
# Wait for a terminal to attach...
@ -26,12 +26,12 @@ with open_serial_interface('/dev/ttyACM0') as interface:
while not attached:
try:
poll_response = poll(interface, receive_timeout=1)
poll_response = interface.execute(Poll(), timeout=1)
if poll_response:
print(poll_response)
poll_ack(interface)
interface.execute(PollAck())
attached = True
except ReceiveTimeout:
@ -41,19 +41,18 @@ with open_serial_interface('/dev/ttyACM0') as interface:
# Poll the terminal until status is empty.
while poll_response:
poll_response = poll(interface)
poll_response = interface.execute(Poll())
if poll_response:
print(poll_response)
poll_ack(interface)
interface.execute(Poll())
# Move the cursor to top-left cell of a 80 column display.
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(80)])
# Write a secret message.
write_data(interface, bytes.fromhex('a1 84 00 92 94 91 84 00 93 8e 00 83 91 88 8d 8a 00 98 8e 94 91 00 ae 95 80 8b 93 88 8d 84'))
interface.execute(WriteData(bytes.fromhex('a1 84 00 92 94 91 84 00 93 8e 00 83 91 88 8d 8a 00 98 8e 94 91 00 ae 95 80 8b 93 88 8d 84')))
```
See [examples](examples) for complete examples.

View File

@ -10,40 +10,42 @@ from .protocol import (
TerminalType,
Control,
SecondaryControl,
poll,
poll_ack,
read_status,
read_terminal_id,
read_extended_id,
read_address_counter_hi,
read_address_counter_lo,
read_data,
read_multiple,
reset,
load_control_register,
load_secondary_control,
load_mask,
load_address_counter_hi,
load_address_counter_lo,
write_data,
clear,
search_forward,
search_backward,
insert_byte,
start_operation,
diagnostic_reset,
read_feature_id,
eab_read_data,
eab_load_mask,
eab_write_alternate,
eab_read_multiple,
eab_write_under_mask,
eab_read_status
Poll,
PollAck,
ReadStatus,
ReadTerminalId,
ReadExtendedId,
ReadAddressCounterHi,
ReadAddressCounterLo,
ReadData,
ReadMultiple,
Reset,
LoadControlRegister,
LoadSecondaryControl,
LoadMask,
LoadAddressCounterHi,
LoadAddressCounterLo,
WriteData,
Clear,
SearchForward,
SearchBackward,
InsertByte,
StartOperation,
DiagnosticReset,
ReadFeatureId,
EABReadData,
EABLoadMask,
EABWriteAlternate,
EABReadMultiple,
EABWriteUnderMask,
EABReadStatus,
Data
)
from .features import (
Feature,
get_features
read_feature_ids,
parse_features
)
from .exceptions import (
@ -53,3 +55,5 @@ from .exceptions import (
ReceiveTimeout,
ProtocolError
)
from .compat import *

181
pycoax/coax/compat.py Normal file
View File

@ -0,0 +1,181 @@
"""
coax.compat
~~~~~~~~~~~
"""
from .protocol import PollAction, Poll, PollAck, ReadStatus, ReadTerminalId, \
ReadExtendedId, ReadAddressCounterHi, ReadAddressCounterLo, \
ReadData, ReadMultiple, Reset, LoadControlRegister, \
LoadSecondaryControl, LoadMask, LoadAddressCounterHi, \
LoadAddressCounterLo, WriteData, Clear, SearchForward, \
SearchBackward, InsertByte, StartOperation, DiagnosticReset, \
ReadFeatureId, EABReadData, EABLoadMask, EABWriteAlternate, \
EABReadMultiple, EABWriteUnderMask, EABReadStatus
from .features import read_feature_ids, parse_features
def poll(interface, action=PollAction.NONE, **kwargs):
"""Execute a POLL command."""
return interface.execute(Poll(action), **_convert_kwargs(kwargs))
def poll_ack(interface, **kwargs):
"""Execute a POLL_ACK command."""
return interface.execute(PollAck(), **_convert_kwargs(kwargs))
def read_status(interface, **kwargs):
"""Execute a READ_STATUS command."""
return interface.execute(ReadStatus(), **_convert_kwargs(kwargs))
def read_terminal_id(interface, **kwargs):
"""Execute a READ_TERMINAL_ID command."""
return interface.execute(ReadTerminalId(), **_convert_kwargs(kwargs))
def read_extended_id(interface, **kwargs):
"""Execute a READ_EXTENDED_ID command."""
return interface.execute(ReadExtendedId(), **_convert_kwargs(kwargs))
def read_address_counter_hi(interface, **kwargs):
"""Execute a READ_ADDRESS_COUNTER_HI command."""
return interface.execute(ReadAddressCounterHi(), **_convert_kwargs(kwargs))
def read_address_counter_lo(interface, **kwargs):
"""Execute a READ_ADDRESS_COUTER_LO command."""
return interface.execute(ReadAddressCounterLo(), **_convert_kwargs(kwargs))
def read_data(interface, **kwargs):
"""Execute a READ_DATA command."""
return interface.execute(ReadData(), **_convert_kwargs(kwargs))
def read_multiple(interface, **kwargs):
"""Execute a READ_MULTIPLE command."""
return interface.execute(ReadMultiple(), **_convert_kwargs(kwargs))
def reset(interface, **kwargs):
"""Execute a RESET command."""
return interface.execute(Reset(), **_convert_kwargs(kwargs))
def load_control_register(interface, control, **kwargs):
"""Execute a LOAD_CONTROL_REGISTER command."""
return interface.execute(LoadControlRegister(control), **_convert_kwargs(kwargs))
def load_secondary_control(interface, control, **kwargs):
"""Execute a LOAD_SECONDARY_CONTROL command."""
return interface.execute(LoadSecondaryControl(control), **_convert_kwargs(kwargs))
def load_mask(interface, mask, **kwargs):
"""Execute a LOAD_MASK command."""
return interface.execute(LoadMask(mask), **_convert_kwargs(kwargs))
def load_address_counter_hi(interface, address, **kwargs):
"""Execute a LOAD_ADDRESS_COUNTER_HI command."""
return interface.execute(LoadAddressCounterHi(address), **_convert_kwargs(kwargs))
def load_address_counter_lo(interface, address, **kwargs):
"""Execute a LOAD_ADDRESS_COUNTER_LO command."""
return interface.execute(LoadAddressCounterLo(address), **_convert_kwargs(kwargs))
def write_data(interface, data, jumbo_write_strategy=None, **kwargs):
"""Execute a WRITE_DATA command."""
commands = _split_write(WriteData, data, jumbo_write_strategy)
responses = interface.execute(commands, **_convert_kwargs(kwargs))
return responses
def clear(interface, pattern, **kwargs):
"""Execute a CLEAR command."""
return interface.execute(Clear(pattern), **_convert_kwargs(kwargs))
def search_forward(interface, pattern, **kwargs):
"""Execute a SEARCH_FORWARD command."""
return interface.execute(SearchForward(pattern), **_convert_kwargs(kwargs))
def search_backward(interface, pattern, **kwargs):
"""Execute a SEARCH_BACKWARD command."""
return interface.execute(SearchBackward(pattern), **_convert_kwargs(kwargs))
def insert_byte(interface, byte, **kwargs):
"""Execute a INSERT_BYTE command."""
return interface.execute(InsertByte(byte), **_convert_kwargs(kwargs))
def start_operation(interface, **kwargs):
"""Execute a START_OPERATION command."""
return interface.execute(StartOperation(), **_convert_kwargs(kwargs))
def diagnostic_reset(interface, **kwargs):
"""Execute a DIAGNOSTIC_RESET command."""
return interface.execute(DiagnosticReset(), **_convert_kwargs(kwargs))
def read_feature_id(interface, feature_address, **kwargs):
"""Execute a READ_FEATURE_ID command."""
return interface.execute(ReadFeatureId(feature_address), **_convert_kwargs(kwargs))
def get_features(interface, **kwargs):
"""Get the features a terminal supports."""
commands = read_feature_ids()
ids = interface.execute(commands, **_convert_kwargs(kwargs))
return parse_features(ids, commands)
def eab_read_data(interface, feature_address, **kwargs):
"""Execute a EAB_READ_DATA command."""
return interface.execute(EABReadData(feature_address), **_convert_kwargs(kwargs))
def eab_load_mask(interface, feature_address, mask, **kwargs):
"""Execute a EAB_LOAD_MASK command."""
return interface.execute(EABLoadMask(feature_address, mask), **_convert_kwargs(kwargs))
def eab_write_alternate(interface, feature_address, data, jumbo_write_strategy=None, **kwargs):
"""Execute a EAB_WRITE_ALTERNATE command."""
commands = _split_write(lambda data: EABWriteAlternate(feature_address, data), data,
jumbo_write_strategy)
responses = interface.execute(commands, **_convert_kwargs(kwargs))
return responses
def eab_read_multiple(interface, feature_address, **kwargs):
"""Execute a EAB_READ_MULTIPLE command."""
return interface.execute(EABReadMultiple(feature_address), **_convert_kwargs(kwargs))
def eab_write_under_mask(interface, feature_address, byte, **kwargs):
"""Execute a EAB_WRITE_UNDER_MASK command."""
return interface.execute(EABWriteUnderMask(feature_address, byte), **_convert_kwargs(kwargs))
def eab_read_status(interface, feature_address, **kwargs):
"""Execute a EAB_READ_STATUS command."""
return interface.execute(EABReadStatus(feature_address), **_convert_kwargs(kwargs))
def _convert_kwargs(kwargs):
if 'receive_timeout' in kwargs:
kwargs['timeout'] = kwargs['receive_timeout']
del kwargs['receive_timeout']
return kwargs
def _chunked(iterable, size):
for index in range(0, len(iterable), size):
yield iterable[index:index+size]
def _split_write(constructor, data, jumbo_write_strategy):
length = 1
if isinstance(data, tuple):
length += len(data[0]) * data[1]
else:
length += len(data)
# To avoid breaking a EAB_WRITE_ALTERNATE command the maximum length must be even
# and the actual data length will be 2 words less...
max_length = 32
if jumbo_write_strategy == 'split' and length > max_length:
if isinstance(data, tuple):
expanded_data = data[0] * data[1]
else:
expanded_data = data
return [constructor(chunk) for chunk in _chunked(expanded_data, max_length - 2)]
return constructor(data)

View File

@ -5,22 +5,31 @@ coax.features
from enum import Enum
from .protocol import read_feature_id
from .protocol import ReadFeatureId
class Feature(Enum):
"""Terminal feature."""
EAB = 0x79
def get_features(interface, **kwargs):
"""Get the features a terminal supports."""
known_ids = set([feature.value for feature in Feature])
FEATURE_ADDRESS_MIN = 2
FEATURE_ADDRESS_MAX = 15
features = dict()
FEATURE_ADDRESSES = range(FEATURE_ADDRESS_MIN, FEATURE_ADDRESS_MAX + 1)
for address in range(2, 16):
id_ = read_feature_id(interface, address, **kwargs)
def read_feature_ids(addresses=None):
"""Generate READ_FEATURE_ID commands."""
return [ReadFeatureId(address) for address in addresses or FEATURE_ADDRESSES]
def parse_features(ids, commands):
"""Parse READ_FEATURE_ID command responses into a map of features and addresses."""
addresses = [command.feature_address for command in commands]
known_ids = {feature.value for feature in Feature}
features = {}
for (address, id_) in zip(addresses, ids):
if id_ is not None and id_ in known_ids:
feature = Feature(id_)

View File

@ -3,11 +3,106 @@ coax.interface
~~~~~~~~~~~~~~
"""
from enum import Enum
class Interface:
"""3270 coax interface."""
def reset(self):
"""Reset the interface."""
raise NotImplementedError
def transmit_receive(self, transmit_words, transmit_repeat_count=None,
transmit_repeat_offset=1, receive_length=None,
receive_timeout=None):
def execute(self, commands, timeout=None):
"""Execute one or more commands."""
(normalized_commands, has_multiple_commands) = _normalize_commands(commands)
(outbound_frames, response_lengths) = _pack_outbound_frames(normalized_commands)
inbound_frames = self._transmit_receive(outbound_frames, response_lengths, timeout)
responses = _unpack_inbound_frames(inbound_frames, normalized_commands)
if has_multiple_commands:
return responses
response = responses[0]
if isinstance(response, BaseException):
raise response
return response
def _transmit_receive(self, outbound_frames, response_lengths, timeout):
raise NotImplementedError
class FrameFormat(Enum):
"""3270 coax frame format."""
WORDS = 1 # 10-bit words
WORD_DATA = 2 # 10-bit word, 8-bit data words
DATA = 4 # 8-bit data words
def _is_command(command):
return hasattr(command, 'pack_outbound_frame') and hasattr(command, 'unpack_inbound_frame')
def _normalize_command(command):
if _is_command(command):
return (None, command)
if not isinstance(command, tuple):
raise TypeError('Invalid command form')
if len(command) != 2:
raise TypeError('Invalid command form')
if not (command[0] is None or isinstance(command[0], int)):
raise TypeError('Invalid command form')
if not _is_command(command[1]):
raise TypeError('Invalid command form')
return command
def _normalize_commands(commands):
try:
command = _normalize_command(commands)
return ([command], False)
except TypeError:
pass
try:
commands = list(commands)
except TypeError:
raise TypeError('Commands must be valid command or iterable')
if not commands:
raise ValueError('Commands must not be empty')
return ([_normalize_command(command) for command in commands], True)
def _pack_outbound_frames(commands):
frames = []
response_lengths = []
for (address, command) in commands:
frame = command.pack_outbound_frame()
response_length = command.response_length or 1
frames.append((address, frame))
response_lengths.append(response_length)
return (frames, response_lengths)
def _unpack_inbound_frames(frames, commands):
responses = []
for (frame, (_, command)) in zip(frames, commands):
if isinstance(frame, BaseException):
responses.append(frame)
else:
response = command.unpack_inbound_frame(frame)
responses.append(response)
return responses

View File

@ -4,10 +4,10 @@ coax.protocol
"""
from enum import Enum
from more_itertools import chunked
from .exceptions import ProtocolError
from .interface import FrameFormat
from .parity import odd_parity
from .exceptions import ProtocolError
class Command(Enum):
"""Terminal command."""
@ -92,6 +92,9 @@ class KeystrokePollResponse(PollResponse):
self.scan_code = (value >> 2) & 0xff
def __repr__(self):
return f'<KeystrokePollResponse scan_code={self.scan_code}>'
class Status:
"""Terminal status."""
@ -184,207 +187,446 @@ class SecondaryControl:
@property
def value(self):
return bool(self.big) | 0
return int(bool(self.big))
def __repr__(self):
return f'<SecondaryControl big={self.big}>'
def poll(interface, action=PollAction.NONE, **kwargs):
"""Execute a POLL command."""
command_word = (action.value << 8) | pack_command_word(Command.POLL)
class ReadCommand:
"""Base class for read commands."""
response = _execute_read_command(interface, command_word, allow_trta_response=True,
unpack=False, **kwargs)
response_length = None
if response is None:
return None
def pack_outbound_frame(self):
raise NotImplementedError
word = response[0]
def unpack_inbound_frame(self, words):
raise NotImplementedError
if PollResponse.is_power_on_reset_complete(word):
return PowerOnResetCompletePollResponse(word)
class WriteCommand:
"""Base class for write commands."""
if PollResponse.is_keystroke(word):
return KeystrokePollResponse(word)
response_length = 1
return PollResponse(word)
def pack_outbound_frame(self):
raise NotImplementedError
def poll_ack(interface, **kwargs):
"""Execute a POLL_ACK command."""
command_word = pack_command_word(Command.POLL_ACK)
def unpack_inbound_frame(self, words):
if not is_tt_ar(words):
raise ProtocolError(f'Expected TT/AR response: {words}')
_execute_write_command(interface, command_word, **kwargs)
class Poll(ReadCommand):
"""POLL command."""
def read_status(interface, **kwargs):
"""Execute a READ_STATUS command."""
command_word = pack_command_word(Command.READ_STATUS)
response_length = 1
response = _execute_read_command(interface, command_word, **kwargs)
def __init__(self, action=PollAction.NONE):
self.action = action
return Status(response[0])
def pack_outbound_frame(self):
command_word = (self.action.value << 8) | pack_command_word(Command.POLL)
def read_terminal_id(interface, **kwargs):
"""Execute a READ_TERMINAL_ID command."""
command_word = pack_command_word(Command.READ_TERMINAL_ID)
return (FrameFormat.WORD_DATA, command_word)
response = _execute_read_command(interface, command_word, **kwargs)
def unpack_inbound_frame(self, words):
if len(words) != 1:
raise ProtocolError(f'Expected 1 word POLL response: {words}')
return TerminalId(response[0])
if is_tt_ar(words):
return None
def read_extended_id(interface, **kwargs):
"""Execute a READ_EXTENDED_ID command."""
command_word = pack_command_word(Command.READ_EXTENDED_ID)
word = words[0]
return _execute_read_command(interface, command_word, 4, allow_trta_response=True,
**kwargs)
if PollResponse.is_power_on_reset_complete(word):
return PowerOnResetCompletePollResponse(word)
def read_address_counter_hi(interface, **kwargs):
"""Execute a READ_ADDRESS_COUNTER_HI command."""
command_word = pack_command_word(Command.READ_ADDRESS_COUNTER_HI)
if PollResponse.is_keystroke(word):
return KeystrokePollResponse(word)
return _execute_read_command(interface, command_word, **kwargs)[0]
return PollResponse(word)
def read_address_counter_lo(interface, **kwargs):
"""Execute a READ_ADDRESS_COUTER_LO command."""
command_word = pack_command_word(Command.READ_ADDRESS_COUNTER_LO)
class PollAck(WriteCommand):
"""POLL_ACK command."""
return _execute_read_command(interface, command_word, **kwargs)[0]
def pack_outbound_frame(self):
command_word = pack_command_word(Command.POLL_ACK)
def read_data(interface, **kwargs):
"""Execute a READ_DATA command."""
command_word = pack_command_word(Command.READ_DATA)
return (FrameFormat.WORD_DATA, command_word)
return _execute_read_command(interface, command_word, **kwargs)
class ReadStatus(ReadCommand):
"""READ_STATUS command."""
def read_multiple(interface, **kwargs):
"""Execute a READ_MULTIPLE command."""
command_word = pack_command_word(Command.READ_MULTIPLE)
response_length = 1
return _execute_read_command(interface, command_word, 32,
validate_response_length=False, **kwargs)
def pack_outbound_frame(self):
command_word = pack_command_word(Command.READ_STATUS)
def reset(interface, **kwargs):
"""Execute a RESET command."""
command_word = pack_command_word(Command.RESET)
return (FrameFormat.WORD_DATA, command_word)
_execute_write_command(interface, command_word, **kwargs)
def unpack_inbound_frame(self, words):
if len(words) != 1:
raise ProtocolError(f'Expected 1 word READ_STATUS response: {words}')
def load_control_register(interface, control, **kwargs):
"""Execute a LOAD_CONTROL_REGISTER command."""
command_word = pack_command_word(Command.LOAD_CONTROL_REGISTER)
return Status(unpack_data_word(words[0]))
_execute_write_command(interface, command_word, bytes([control.value]), **kwargs)
class ReadTerminalId(ReadCommand):
"""READ_TERMINAL_ID command."""
def load_secondary_control(interface, control, **kwargs):
"""Execute a LOAD_SECONDARY_CONTROL command."""
command_word = pack_command_word(Command.LOAD_SECONDARY_CONTROL)
response_length = 1
_execute_write_command(interface, command_word, bytes([control.value]), **kwargs)
def pack_outbound_frame(self):
command_word = pack_command_word(Command.READ_TERMINAL_ID)
def load_mask(interface, mask, **kwargs):
"""Execute a LOAD_MASK command."""
command_word = pack_command_word(Command.LOAD_MASK)
return (FrameFormat.WORD_DATA, command_word)
_execute_write_command(interface, command_word, bytes([mask]), **kwargs)
def unpack_inbound_frame(self, words):
if len(words) != 1:
raise ProtocolError(f'Expected 1 word READ_TERMINAL_ID response: {words}')
def load_address_counter_hi(interface, address, **kwargs):
"""Execute a LOAD_ADDRESS_COUNTER_HI command."""
command_word = pack_command_word(Command.LOAD_ADDRESS_COUNTER_HI)
return TerminalId(unpack_data_word(words[0]))
_execute_write_command(interface, command_word, bytes([address]), **kwargs)
class ReadExtendedId(ReadCommand):
"""READ_EXTENDED_ID command."""
def load_address_counter_lo(interface, address, **kwargs):
"""Execute a LOAD_ADDRESS_COUNTER_LO command."""
command_word = pack_command_word(Command.LOAD_ADDRESS_COUNTER_LO)
response_length = 4
_execute_write_command(interface, command_word, bytes([address]), **kwargs)
def pack_outbound_frame(self):
command_word = pack_command_word(Command.READ_EXTENDED_ID)
def write_data(interface, data, **kwargs):
"""Execute a WRITE_DATA command."""
command_word = pack_command_word(Command.WRITE_DATA)
return (FrameFormat.WORD_DATA, command_word)
_execute_write_command(interface, command_word, data, **kwargs)
def unpack_inbound_frame(self, words):
if len(words) != 4:
raise ProtocolError(f'Expected 4 word READ_EXTENDED_ID response: {words}')
def clear(interface, pattern, **kwargs):
"""Execute a CLEAR command."""
command_word = pack_command_word(Command.CLEAR)
return unpack_data_words(words)
_execute_write_command(interface, command_word, bytes([pattern]), **kwargs)
class ReadAddressCounterHi(ReadCommand):
"""READ_ADDRESS_COUNTER_HI command."""
def search_forward(interface, pattern, **kwargs):
"""Execute a SEARCH_FORWARD command."""
command_word = pack_command_word(Command.SEARCH_FORWARD)
response_length = 1
_execute_write_command(interface, command_word, bytes([pattern]), **kwargs)
def pack_outbound_frame(self):
command_word = pack_command_word(Command.READ_ADDRESS_COUNTER_HI)
def search_backward(interface, pattern, **kwargs):
"""Execute a SEARCH_BACKWARD command."""
command_word = pack_command_word(Command.SEARCH_BACKWARD)
return (FrameFormat.WORD_DATA, command_word)
_execute_write_command(interface, command_word, bytes([pattern]), **kwargs)
def unpack_inbound_frame(self, words):
if len(words) != 1:
raise ProtocolError(f'Expected 1 word READ_ADDRESS_COUNTER_HI response: {words}')
def insert_byte(interface, byte, **kwargs):
"""Execute a INSERT_BYTE command."""
command_word = pack_command_word(Command.INSERT_BYTE)
return unpack_data_word(words[0])
_execute_write_command(interface, command_word, bytes([byte]), **kwargs)
class ReadAddressCounterLo(ReadCommand):
"""READ_ADDRESS_COUNTER_LO command."""
def start_operation(interface):
"""Execute a START_OPERATION command."""
raise NotImplementedError
response_length = 1
def diagnostic_reset(interface):
"""Execute a DIAGNOSTIC_RESET command."""
raise NotImplementedError
def pack_outbound_frame(self):
command_word = pack_command_word(Command.READ_ADDRESS_COUNTER_LO)
def read_feature_id(interface, feature_address, **kwargs):
"""Execute a READ_FEATURE_ID command."""
command_word = pack_command_word(Command.READ_FEATURE_ID, feature_address)
return (FrameFormat.WORD_DATA, command_word)
response = _execute_read_command(interface, command_word, 1, allow_trta_response=True,
**kwargs)
def unpack_inbound_frame(self, words):
if len(words) != 1:
raise ProtocolError(f'Expected 1 word READ_ADDRESS_COUNTER_LO response: {words}')
if response is None:
return None
return unpack_data_word(words[0])
return response[0]
class ReadData(ReadCommand):
"""READ_DATA command."""
def eab_read_data(interface, feature_address, **kwargs):
"""Execute a EAB_READ_DATA command."""
command_word = pack_command_word(Command.EAB_READ_DATA, feature_address)
response_length = 1
return _execute_read_command(interface, command_word, **kwargs)
def pack_outbound_frame(self):
command_word = pack_command_word(Command.READ_DATA)
def eab_load_mask(interface, feature_address, mask, **kwargs):
"""Execute a EAB_LOAD_MASK command."""
command_word = pack_command_word(Command.EAB_LOAD_MASK, feature_address)
return (FrameFormat.WORD_DATA, command_word)
_execute_write_command(interface, command_word, bytes([mask]), **kwargs)
def unpack_inbound_frame(self, words):
if len(words) != 1:
raise ProtocolError(f'Expected 1 word READ_DATA response: {words}')
def eab_write_alternate(interface, feature_address, data, **kwargs):
"""Execute a EAB_WRITE_ALTERNATE command."""
command_word = pack_command_word(Command.EAB_WRITE_ALTERNATE, feature_address)
return unpack_data_word(words[0])
_execute_write_command(interface, command_word, data, **kwargs)
class ReadMultiple(ReadCommand):
"""READ_MULTIPLE command."""
def eab_read_multiple(interface, feature_address, **kwargs):
"""Execute a EAB_READ_MULTIPLE command."""
command_word = pack_command_word(Command.EAB_READ_MULTIPLE, feature_address)
response_length = 32
return _execute_read_command(interface, command_word, 32,
validate_response_length=False, **kwargs)
def pack_outbound_frame(self):
command_word = pack_command_word(Command.READ_MULTIPLE)
def eab_write_under_mask(interface, feature_address, byte, **kwargs):
"""Execute a EAB_WRITE_UNDER_MASK command."""
command_word = pack_command_word(Command.EAB_WRITE_UNDER_MASK, feature_address)
return (FrameFormat.WORD_DATA, command_word)
_execute_write_command(interface, command_word, bytes([byte]), **kwargs)
def unpack_inbound_frame(self, words):
if len(words) == 0:
raise ProtocolError(f'Expected 1 or more word READ_MULTIPLE response: {words}')
def eab_read_status(interface, feature_address, **kwargs):
"""Execute a EAB_READ_STATUS command."""
command_word = pack_command_word(Command.EAB_READ_STATUS, feature_address)
return unpack_data_words(words)
return _execute_read_command(interface, command_word, **kwargs)[0]
class Reset(WriteCommand):
"""RESET command."""
def pack_outbound_frame(self):
command_word = pack_command_word(Command.RESET)
return (FrameFormat.WORD_DATA, command_word)
class LoadControlRegister(WriteCommand):
"""LOAD_CONTROL_REGISTER command."""
def __init__(self, control):
self.control = control
def pack_outbound_frame(self):
command_word = pack_command_word(Command.LOAD_CONTROL_REGISTER)
return (FrameFormat.WORD_DATA, command_word, [self.control.value])
class LoadSecondaryControl(WriteCommand):
"""LOAD_SECONDARY_CONTROL command."""
def __init__(self, control):
self.control = control
def pack_outbound_frame(self):
command_word = pack_command_word(Command.LOAD_SECONDARY_CONTROL)
return (FrameFormat.WORD_DATA, command_word, [self.control.value])
class LoadMask(WriteCommand):
"""LOAD_MASK command."""
def __init__(self, mask):
self.mask = mask
def pack_outbound_frame(self):
command_word = pack_command_word(Command.LOAD_MASK)
return (FrameFormat.WORD_DATA, command_word, [self.mask])
class LoadAddressCounterHi(WriteCommand):
"""LOAD_ADDRESS_COUNTER_HI command."""
def __init__(self, address):
if address < 0 or address > 255:
raise ValueError('Address is out of range')
self.address = address
def pack_outbound_frame(self):
command_word = pack_command_word(Command.LOAD_ADDRESS_COUNTER_HI)
return (FrameFormat.WORD_DATA, command_word, [self.address])
class LoadAddressCounterLo(WriteCommand):
"""LOAD_ADDRESS_COUNTER_LO command."""
def __init__(self, address):
if address < 0 or address > 255:
raise ValueError('Address is out of range')
self.address = address
def pack_outbound_frame(self):
command_word = pack_command_word(Command.LOAD_ADDRESS_COUNTER_LO)
return (FrameFormat.WORD_DATA, command_word, [self.address])
class WriteData(WriteCommand):
"""WRITE_DATA command."""
def __init__(self, data):
self.data = data
def pack_outbound_frame(self):
command_word = pack_command_word(Command.WRITE_DATA)
return (FrameFormat.WORD_DATA, command_word, self.data)
class Clear(WriteCommand):
"""CLEAR command."""
def __init__(self, pattern):
self.pattern = pattern
def pack_outbound_frame(self):
command_word = pack_command_word(Command.CLEAR)
return (FrameFormat.WORD_DATA, command_word, [self.pattern])
class SearchForward(WriteCommand):
"""SEARCH_FORWARD command."""
def __init__(self, pattern):
self.pattern = pattern
def pack_outbound_frame(self):
command_word = pack_command_word(Command.SEARCH_FORWARD)
return (FrameFormat.WORD_DATA, command_word, [self.pattern])
class SearchBackward(WriteCommand):
"""SEARCH_BACKWARD command."""
def __init__(self, pattern):
self.pattern = pattern
def pack_outbound_frame(self):
command_word = pack_command_word(Command.SEARCH_BACKWARD)
return (FrameFormat.WORD_DATA, command_word, [self.pattern])
class InsertByte(WriteCommand):
"""INSERT_BYTE command."""
def __init__(self, byte):
self.byte = byte
def pack_outbound_frame(self):
command_word = pack_command_word(Command.INSERT_BYTE)
return (FrameFormat.WORD_DATA, command_word, [self.byte])
class StartOperation(WriteCommand):
"""START_OPERATION command."""
def pack_outbound_frame(self):
raise NotImplementedError
def unpack_inbound_frame(self, words):
raise NotImplementedError
class DiagnosticReset(WriteCommand):
"""DIAGNOSTIC_RESET command."""
def pack_outbound_frame(self):
raise NotImplementedError
def unpack_inbound_frame(self, words):
raise NotImplementedError
class ReadFeatureId(ReadCommand):
"""READ_FEATURE_ID command."""
response_length = 1
def __init__(self, feature_address):
self.feature_address = feature_address
def pack_outbound_frame(self):
command_word = pack_command_word(Command.READ_FEATURE_ID, self.feature_address)
return (FrameFormat.WORD_DATA, command_word)
def unpack_inbound_frame(self, words):
if len(words) != 1:
raise ProtocolError(f'Expected 1 word READ_FEATURE_ID response: {words}')
if is_tt_ar(words):
return None
return unpack_data_word(words[0])
class EABReadData(ReadCommand):
"""EAB_READ_DATA command."""
response_length = 1
def __init__(self, feature_address):
self.feature_address = feature_address
def pack_outbound_frame(self):
command_word = pack_command_word(Command.EAB_READ_DATA, self.feature_address)
return (FrameFormat.WORD_DATA, command_word)
def unpack_inbound_frame(self, words):
if len(words) != 1:
raise ProtocolError(f'Expected 1 word EAB_READ_DATA response: {words}')
return unpack_data_words(words)
class EABLoadMask(WriteCommand):
"""EAB_LOAD_MASK command."""
def __init__(self, feature_address, mask):
self.feature_address = feature_address
self.mask = mask
def pack_outbound_frame(self):
command_word = pack_command_word(Command.EAB_LOAD_MASK, self.feature_address)
return (FrameFormat.WORD_DATA, command_word, [self.mask])
class EABWriteAlternate(WriteCommand):
"""EAB_WRITE_ALTERNATE command."""
def __init__(self, feature_address, data):
self.feature_address = feature_address
self.data = data
def pack_outbound_frame(self):
command_word = pack_command_word(Command.EAB_WRITE_ALTERNATE, self.feature_address)
return (FrameFormat.WORD_DATA, command_word, self.data)
class EABReadMultiple(ReadCommand):
"""EAB_READ_MULTIPLE command."""
response_length = 32
def __init__(self, feature_address):
self.feature_address = feature_address
def pack_outbound_frame(self):
command_word = pack_command_word(Command.EAB_READ_MULTIPLE, self.feature_address)
return (FrameFormat.WORD_DATA, command_word)
def unpack_inbound_frame(self, words):
return unpack_data_words(words)
class EABWriteUnderMask(WriteCommand):
"""EAB_WRITE_UNDER_MASK command."""
def __init__(self, feature_address, byte):
self.feature_address = feature_address
self.byte = byte
def pack_outbound_frame(self):
command_word = pack_command_word(Command.EAB_WRITE_UNDER_MASK, self.feature_address)
return (FrameFormat.WORD_DATA, command_word, [self.byte])
class EABReadStatus(ReadCommand):
"""EAB_READ_STATUS command."""
response_length = 1
def __init__(self, feature_address):
self.feature_address = feature_address
def pack_outbound_frame(self):
command_word = pack_command_word(Command.EAB_READ_STATUS, self.feature_address)
return (FrameFormat.WORD_DATA, command_word)
def unpack_inbound_frame(self, words):
if len(words) != 1:
raise ProtocolError(f'Expected 1 word EAB_READ_STATUS response: {words}')
return unpack_data_word(words[0])
class Data(WriteCommand):
"""Unaccompanied data."""
def __init__(self, data):
self.data = data
def pack_outbound_frame(self):
return (FrameFormat.DATA, self.data)
def pack_command_word(command, feature_address=None):
"""Pack a command into a 10-bit command word."""
@ -399,6 +641,10 @@ def pack_data_word(byte, set_parity=True):
return (byte << 2) | (parity << 1)
def is_tt_ar(words):
"""Is the word a TT/AR (transmission turnaround / auto response)?"""
return len(words) == 1 and words[0] == 0
def is_data_word(word):
"""Is data word bit set?"""
return (word & 0x1) == 0
@ -423,61 +669,3 @@ def pack_data_words(bytes_, set_parity=True):
def unpack_data_words(words, check_parity=False):
"""Unpack the data bytes from 10-bit data words."""
return bytes([unpack_data_word(word, check_parity=check_parity) for word in words])
def _execute_read_command(interface, command_word, response_length=1,
validate_response_length=True, allow_trta_response=False,
trta_value=None, unpack=True, **kwargs):
"""Execute a standard read command."""
response = interface.transmit_receive([command_word], receive_length=response_length,
**kwargs)
if allow_trta_response and len(response) == 1 and response[0] == 0:
return trta_value
if validate_response_length and len(response) != response_length:
raise ProtocolError((f'Expected {response_length} word response: {response}'))
return unpack_data_words(response) if unpack else response
def _execute_write_command(interface, command_word, data=None,
jumbo_write_strategy=None, **kwargs):
"""Execute a standard write command."""
length = 1
if isinstance(data, tuple):
length += len(data[0]) * data[1]
elif data is not None:
length += len(data)
max_length = 1024
if jumbo_write_strategy == 'split' and length > max_length:
if isinstance(data, tuple):
data_words = pack_data_words(data[0]) * data[1]
else:
data_words = pack_data_words(data)
for words in chunked([command_word, *data_words], max_length):
_execute_write(interface, words, None, **kwargs)
else:
data_words = []
transmit_repeat_count = None
if isinstance(data, tuple):
data_words = pack_data_words(data[0])
transmit_repeat_count = data[1]
elif data is not None:
data_words = pack_data_words(data)
_execute_write(interface, [command_word, *data_words],
transmit_repeat_count, **kwargs)
def _execute_write(interface, words, transmit_repeat_count, **kwargs):
response = interface.transmit_receive(words, transmit_repeat_count,
receive_length=1, **kwargs)
if len(response) != 1:
raise ProtocolError(f'Expected 1 word response: {response}')
if response[0] != 0:
raise ProtocolError(f'Expected TR/TA response: {response}')

View File

@ -11,10 +11,13 @@ from contextlib import contextmanager
from serial import Serial
from sliplib import SlipWrapper, ProtocolError
from .interface import Interface
from .interface import Interface, FrameFormat
from .protocol import pack_data_word
from .exceptions import InterfaceError, InterfaceTimeout, ReceiveError, ReceiveTimeout
class SerialInterface(Interface):
"""Serial attached 3270 coax interface."""
def __init__(self, serial):
if serial is None:
raise ValueError('Serial port is required')
@ -27,6 +30,7 @@ class SerialInterface(Interface):
self.legacy_firmware_version = None
def reset(self):
"""Reset the interface."""
original_serial_timeout = self.serial.timeout
self.serial.timeout = 5
@ -52,31 +56,12 @@ class SerialInterface(Interface):
(major, minor, patch) = struct.unpack('BBB', message[1:])
self.legacy_firmware_detected = True
self.legacy_firmware_version = '{}.{}.{}'.format(major, minor, patch)
self.legacy_firmware_version = f'{major}.{minor}.{patch}'
else:
raise InterfaceError(f'Invalid reset response: {message}')
def transmit_receive(self, transmit_words, transmit_repeat_count=None,
transmit_repeat_offset=1, receive_length=None,
receive_timeout=None):
timeout_milliseconds = self._calculate_timeout_milliseconds(receive_timeout)
message = bytes([0x06])
message += _pack_transmit_header(transmit_repeat_count, transmit_repeat_offset)
message += _pack_transmit_data(transmit_words)
message += _pack_receive_header(receive_length, timeout_milliseconds)
self._write_message(message)
message = self._read_message()
if message[0] != 0x01:
raise _convert_error(message)
return _unpack_receive_data(message[1:])
def enter_dfu_mode(self):
"""Enter device firmware upgrade mode."""
message = bytes([0xf2])
self._write_message(message)
@ -86,6 +71,40 @@ class SerialInterface(Interface):
if message[0] != 0x01:
raise _convert_error(message)
def _transmit_receive(self, outbound_frames, response_lengths, timeout):
if any(address is not None for (address, _) in outbound_frames):
raise NotImplementedError('Interface does not support 3299 protocol')
if len(response_lengths) != len(outbound_frames):
raise ValueError('Response lengths length must equal outbound frames length')
# Pack all messages before sending.
timeout_milliseconds = self._calculate_timeout_milliseconds(timeout)
messages = [_pack_transmit_receive_message(frame, response_length, timeout_milliseconds)
for ((_, frame), response_length) in zip(outbound_frames, response_lengths)]
responses = []
for message in messages:
self._write_message(message)
message = self._read_message()
if message[0] == 0x01:
response = _unpack_transmit_receive_response(message[1:])
else:
error = _convert_error(message)
if not isinstance(error, (ReceiveError, ReceiveTimeout)):
raise error
response = error
responses.append(response)
return responses
def _calculate_timeout_milliseconds(self, timeout):
milliseconds = 0
@ -122,6 +141,7 @@ class SerialInterface(Interface):
@contextmanager
def open_serial_interface(serial_port, reset=True):
"""Opens serial port and initializes serial attached 3270 coax interface."""
with Serial(serial_port, 115200) as serial:
serial.reset_input_buffer()
serial.reset_output_buffer()
@ -138,23 +158,52 @@ def open_serial_interface(serial_port, reset=True):
yield interface
def _pack_transmit_header(repeat_count, repeat_offset):
repeat = ((repeat_offset << 15) | repeat_count) if repeat_count else 0
def _pack_transmit_receive_message(frame, response_length, timeout_milliseconds):
message = bytes([0x06])
return struct.pack('>H', repeat)
def _pack_transmit_data(words):
repeat_count = 0
repeat_offset = 0
bytes_ = bytearray()
for word in words:
bytes_ += struct.pack('<H', word)
if frame[0] == FrameFormat.WORDS:
if isinstance(frame[1], tuple):
repeat_count = frame[1][1]
return bytes_
for word in frame[1][0]:
bytes_ += struct.pack('<H', word)
else:
for word in frame[1]:
bytes_ += struct.pack('<H', word)
elif frame[0] == FrameFormat.WORD_DATA:
bytes_ += struct.pack('<H', frame[1])
def _pack_receive_header(length, timeout_milliseconds):
return struct.pack('>HH', length or 0, timeout_milliseconds)
if len(frame) > 2:
if isinstance(frame[2], tuple):
repeat_offset = 1
repeat_count = frame[2][1]
def _unpack_receive_data(bytes_):
for byte in frame[2][0]:
bytes_ += struct.pack('<H', pack_data_word(byte))
else:
for byte in frame[2]:
bytes_ += struct.pack('<H', pack_data_word(byte))
elif frame[0] == FrameFormat.DATA:
if isinstance(frame[1], tuple):
repeat_count = frame[1][1]
for byte in frame[1][0]:
bytes_ += struct.pack('<H', pack_data_word(byte))
else:
for byte in frame[1]:
bytes_ += struct.pack('<H', pack_data_word(byte))
message += struct.pack('>H', (repeat_offset << 15) | repeat_count)
message += bytes_
message += struct.pack('>HH', response_length, timeout_milliseconds)
return message
def _unpack_transmit_receive_response(bytes_):
return [(hi << 8) | lo for (lo, hi) in zip(bytes_[::2], bytes_[1::2])]
ERROR_MAP = {

View File

@ -2,16 +2,16 @@
from common import open_example_serial_interface
from coax import poll, poll_ack
from coax import Poll, PollAck
with open_example_serial_interface(poll_flush=False) as interface:
print('POLL...')
poll_response = poll(interface, receive_timeout=5)
poll_response = interface.execute(Poll(), timeout=5)
print(poll_response)
if poll_response:
print('POLL_ACK...')
poll_ack(interface)
interface.execute(PollAck())

View File

@ -2,11 +2,11 @@
from common import open_example_serial_interface
from coax import read_terminal_id
from coax import ReadTerminalId
with open_example_serial_interface() as interface:
print('READ_TERMINAL_ID...')
terminal_id = read_terminal_id(interface)
terminal_id = interface.execute(ReadTerminalId())
print(terminal_id)

View File

@ -2,31 +2,27 @@
from common import open_example_serial_interface
from coax import read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data
from coax import ReadAddressCounterHi, ReadAddressCounterLo, LoadAddressCounterHi, LoadAddressCounterLo, WriteData, Data
with open_example_serial_interface() as interface:
print('LOAD_ADDRESS_COUNTER_HI...')
print('LOAD_ADDRESS_COUNTER_HI and LO...')
load_address_counter_hi(interface, 0)
print('LOAD_ADDRESS_COUNTER_LO...')
load_address_counter_lo(interface, 80)
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(80)])
print('WRITE_DATA...')
write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'))
interface.execute(WriteData(bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19')))
print('READ_ADDRESS_COUNTER_HI...')
print('READ_ADDRESS_COUNTER_HI and LO...')
hi = read_address_counter_hi(interface)
print('READ_ADDRESS_COUNTER_LO...')
lo = read_address_counter_lo(interface)
[hi, lo] = interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()])
print(f'hi = {hi:02x}, lo = {lo:02x}')
print('WRITE_DATA (repeat twice)...')
write_data(interface, (bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'), 2))
interface.execute(WriteData((bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'), 2)))
print('Unaccompanied data after WRITE_DATA...')
interface.execute(Data(bytes.fromhex('33 00 80 8d 83 00 92 8e 8c 84 00 94 8d 80 82 82 8e 8c 8f 80 8d 88 84 83 00 83 80 93 80')))

View File

@ -2,11 +2,11 @@
from common import open_example_serial_interface
from coax import read_status
from coax import ReadStatus
with open_example_serial_interface() as interface:
print('READ_STATUS...')
status = read_status(interface)
status = interface.execute(ReadStatus())
print(status)

View File

@ -2,34 +2,33 @@
from common import open_example_serial_interface
from coax import Control, poll, load_address_counter_hi, load_address_counter_lo, write_data, load_control_register
from coax import Control, LoadAddressCounterHi, LoadAddressCounterLo, WriteData, LoadControlRegister
with open_example_serial_interface() as interface:
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(80)])
write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'))
interface.execute(WriteData(bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19')))
input('Press ENTER...')
print('LOAD_CONTROL_REGISTER display_inhibit')
load_control_register(interface, Control(display_inhibit=True))
interface.execute(LoadControlRegister(Control(display_inhibit=True)))
input('Press ENTER...')
print('LOAD_CONTROL_REGISTER cursor_inhibit')
load_control_register(interface, Control(cursor_inhibit=True))
interface.execute(LoadControlRegister(Control(cursor_inhibit=True)))
input('Press ENTER...')
print('LOAD_CONTROL_REGISTER cursor_reverse')
load_control_register(interface, Control(cursor_reverse=True))
interface.execute(LoadControlRegister(Control(cursor_reverse=True)))
input('Press ENTER...')
print('LOAD_CONTROL_REGISTER cursor_blink')
load_control_register(interface, Control(cursor_blink=True))
interface.execute(LoadControlRegister(Control(cursor_blink=True)))

View File

@ -2,22 +2,21 @@
from common import open_example_serial_interface
from coax import read_address_counter_hi, read_address_counter_lo, read_data, load_address_counter_hi, load_address_counter_lo, write_data
from coax import ReadAddressCounterHi, ReadAddressCounterLo, ReadData, LoadAddressCounterHi, LoadAddressCounterLo, WriteData
with open_example_serial_interface() as interface:
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(80)])
write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'))
interface.execute(WriteData(bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19')))
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 81)
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(81)])
print('READ_DATA...')
print(read_data(interface))
byte = interface.execute(ReadData())
hi = read_address_counter_hi(interface)
lo = read_address_counter_lo(interface)
print(byte)
[hi, lo] = interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()])
print(f'hi = {hi}, lo = {lo}')

View File

@ -2,38 +2,37 @@
from common import open_example_serial_interface
from coax import SecondaryControl, read_address_counter_hi, read_address_counter_lo, read_multiple, load_address_counter_hi, load_address_counter_lo, load_secondary_control, write_data
from coax import SecondaryControl, ReadAddressCounterHi, ReadAddressCounterLo, ReadMultiple, LoadAddressCounterHi, LoadAddressCounterLo, LoadSecondaryControl, WriteData
with open_example_serial_interface() as interface:
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(80)])
write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'))
interface.execute(WriteData(bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19')))
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 81)
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(81)])
print('READ_MULTIPLE...')
print(read_multiple(interface))
bytes_ = interface.execute(ReadMultiple())
hi = read_address_counter_hi(interface)
lo = read_address_counter_lo(interface)
print(bytes_)
[hi, lo] = interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()])
print(f'hi = {hi}, lo = {lo}')
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 81)
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(81)])
print('LOAD_SECONDARY_CONTROL big')
load_secondary_control(interface, SecondaryControl(big=True))
interface.execute(LoadSecondaryControl(SecondaryControl(big=True)))
print('READ_MULTIPLE...')
print(read_multiple(interface))
bytes_ = interface.execute(ReadMultiple())
hi = read_address_counter_hi(interface)
lo = read_address_counter_lo(interface)
print(bytes_)
[hi, lo] = interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()])
print(f'hi = {hi}, lo = {lo}')

View File

@ -2,9 +2,9 @@
from common import open_example_serial_interface
from coax import reset
from coax import Reset
with open_example_serial_interface() as interface:
print('RESET...')
reset(interface)
interface.execute(Reset())

View File

@ -2,65 +2,60 @@
from common import open_example_serial_interface
from coax import read_address_counter_hi, read_address_counter_lo, read_status, load_address_counter_hi, load_address_counter_lo, write_data, load_mask, search_forward, search_backward
from coax import ReadAddressCounterHi, ReadAddressCounterLo, ReadStatus, LoadAddressCounterHi, LoadAddressCounterLo, WriteData, LoadMask, SearchForward, SearchBackward
with open_example_serial_interface() as interface:
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(80)])
write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'))
interface.execute(WriteData(bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19')))
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 81)
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(81)])
load_mask(interface, 0xff)
interface.execute(LoadMask(0xff))
search_forward(interface, 0x83)
interface.execute(SearchForward(0x83))
status = read_status(interface)
status = interface.execute(ReadStatus())
print(status)
while status.busy:
status = read_status(interface)
status = interface.execute(ReadStatus())
print(status)
hi = read_address_counter_hi(interface)
lo = read_address_counter_lo(interface)
[hi, lo] = interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()])
print(f'hi = {hi}, lo = {lo}')
search_backward(interface, 0x84)
interface.execute(SearchBackward(0x84))
status = read_status(interface)
status = interface.execute(ReadStatus())
print(status)
while status.busy:
status = read_status(interface)
status = interface.execute(ReadStatus())
print(status)
hi = read_address_counter_hi(interface)
lo = read_address_counter_lo(interface)
[hi, lo] = interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()])
print(f'hi = {hi}, lo = {lo}')
load_mask(interface, 0xf0)
interface.execute(LoadMask(0xf0))
search_forward(interface, 0x30)
interface.execute(SearchForward(0x30))
status = read_status(interface)
status = interface.execute(ReadStatus())
print(status)
while status.busy:
status = read_status(interface)
status = interface.execute(ReadStatus())
print(status)
hi = read_address_counter_hi(interface)
lo = read_address_counter_lo(interface)
[hi, lo] = interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()])
print(f'hi = {hi}, lo = {lo}')

View File

@ -2,24 +2,21 @@
from common import open_example_serial_interface
from coax import read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data, insert_byte
from coax import LoadAddressCounterHi, LoadAddressCounterLo, WriteData, InsertByte
with open_example_serial_interface() as interface:
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(80)])
write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'))
interface.execute(WriteData(bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19')))
input('Press ENTER...')
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 81)
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(81)])
insert_byte(interface, 0xb7)
interface.execute(InsertByte(0xb7))
input('Press ENTER...')
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 88)
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(88)])
insert_byte(interface, 0xb7)
interface.execute(InsertByte(0xb7))

View File

@ -2,33 +2,41 @@
from common import open_example_serial_interface
from coax import read_address_counter_hi, read_address_counter_lo, read_status, load_address_counter_hi, load_address_counter_lo, write_data, load_mask, clear
from coax import ReadAddressCounterHi, ReadAddressCounterLo, ReadStatus, LoadAddressCounterHi, LoadAddressCounterLo, WriteData, LoadMask, Clear
with open_example_serial_interface() as interface:
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)
# Clear the entire screen, except status line.
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(80), LoadMask(0x00), Clear(0x00)])
write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'))
input('Press ENTER...')
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 81)
load_mask(interface, 0xf0)
clear(interface, 0x30)
status = read_status(interface)
status = interface.execute(ReadStatus())
print(status)
while status.busy:
status = read_status(interface)
status = interface.execute(ReadStatus())
print(status)
hi = read_address_counter_hi(interface)
lo = read_address_counter_lo(interface)
input('Press ENTER...')
# Write something...
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(80)])
interface.execute(WriteData(bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19')))
input('Press ENTER...')
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(81), LoadMask(0xf0), Clear(0x30)])
status = interface.execute(ReadStatus())
print(status)
while status.busy:
status = interface.execute(ReadStatus())
print(status)
[hi, lo] = interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()])
print(f'hi = {hi}, lo = {lo}')

View File

@ -2,15 +2,14 @@
from common import open_example_serial_interface
from coax import Control, read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data, load_control_register
from coax import Control, LoadAddressCounterHi, LoadAddressCounterLo, WriteData, LoadControlRegister
DIGIT_MAP = [0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x80, 0x81, 0x82, 0x83, 0x84, 0x85]
with open_example_serial_interface() as interface:
load_control_register(interface, Control(cursor_inhibit=True))
interface.execute(LoadControlRegister(Control(cursor_inhibit=True)))
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(80)])
buffer = b'\x00' * 4
@ -34,11 +33,10 @@ with open_example_serial_interface() as interface:
buffer += b'\x00' * 560
write_data(interface, buffer)
interface.execute(WriteData(buffer))
# Status Line
load_address_counter_hi(interface, 7)
load_address_counter_lo(interface, 48)
interface.execute([LoadAddressCounterHi(7), LoadAddressCounterLo(48)])
buffer = b''
@ -51,11 +49,10 @@ with open_example_serial_interface() as interface:
for lo in range(0, 16):
buffer += bytes([DIGIT_MAP[lo]])
write_data(interface, buffer)
interface.execute(WriteData(buffer))
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 0)
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(0)])
buffer = bytes(range(0xc0, 0xff + 1))
write_data(interface, buffer)
interface.execute(WriteData(buffer))

View File

@ -2,9 +2,13 @@
from common import open_example_serial_interface
from coax import get_features
from coax import read_feature_ids, parse_features
with open_example_serial_interface() as interface:
features = get_features(interface)
commands = read_feature_ids()
ids = interface.execute(commands)
features = parse_features(ids, commands)
print(features)

View File

@ -5,7 +5,14 @@ from itertools import chain
from common import open_example_serial_interface
from coax import Feature, get_features, load_address_counter_hi, load_address_counter_lo, write_data, eab_write_alternate, eab_load_mask
from coax import read_feature_ids, parse_features, Feature, LoadAddressCounterHi, LoadAddressCounterLo, WriteData, EABWriteAlternate, EABLoadMask
def get_features(interface):
commands = read_feature_ids()
ids = interface.execute(commands)
return parse_features(ids, commands)
def eab_alternate_zip(regen_buffer, eab_buffer):
return bytes(chain(*zip(regen_buffer, eab_buffer)))
@ -21,53 +28,47 @@ with open_example_serial_interface() as interface:
print(f'EAB feature found at address {eab_address}')
# Protected Normal
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(80)])
regen_buffer = bytes.fromhex('e0 08 00 af 91 8e 93 84 82 93 84 83 00 ad 8e 91 8c 80 8b 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 09')
write_data(interface, regen_buffer)
interface.execute(WriteData(regen_buffer))
# Protected Intense
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 160)
interface.execute([LoadAddressCounterHi(0), LoadAddressCounterLo(160)])
regen_buffer = bytes.fromhex('e8 08 00 af 91 8e 93 84 82 93 84 83 00 a8 8d 93 84 8d 92 84 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 09')
write_data(interface, regen_buffer)
interface.execute(WriteData(regen_buffer))
# Normal EFA
load_address_counter_hi(interface, 1)
load_address_counter_lo(interface, 64)
interface.execute([LoadAddressCounterHi(1), LoadAddressCounterLo(64)])
regen_buffer = bytes.fromhex('e0 08 00 ad 8e 91 8c 80 8b 00 a4 a5 a0 00 00 00 00 00 00 00 00 00 00 b7 bf 00 a1 bf 00 b1 bf 00 ac bf 00 a6 bf 00 a2 bf 00 b8 bf 00 b6 bf 00 00 09 e0')
eab_buffer = bytes.fromhex('00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 08 00 00 10 00 00 18 00 00 20 00 00 28 00 00 30 00 00 38 00 00 00 00 00')
eab_write_alternate(interface, eab_address, eab_alternate_zip(regen_buffer, eab_buffer))
interface.execute(EABWriteAlternate(eab_address, eab_alternate_zip(regen_buffer, eab_buffer)))
# Blink EFA
load_address_counter_hi(interface, 1)
load_address_counter_lo(interface, 144)
interface.execute([LoadAddressCounterHi(1), LoadAddressCounterLo(144)])
regen_buffer = bytes.fromhex('e0 08 00 a1 8b 88 8d 8a 00 a4 a5 a0 00 00 00 00 00 00 00 00 00 00 00 b7 bf 00 a1 bf 00 b1 bf 00 ac bf 00 a6 bf 00 a2 bf 00 b8 bf 00 b6 bf 00 00 09 e0')
eab_buffer = bytes.fromhex('40 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 08 00 00 10 00 00 18 00 00 20 00 00 28 00 00 30 00 00 38 00 00 00 00 00')
eab_write_alternate(interface, eab_address, eab_alternate_zip(regen_buffer, eab_buffer))
interface.execute(EABWriteAlternate(eab_address, eab_alternate_zip(regen_buffer, eab_buffer)))
# Reverse EFA
load_address_counter_hi(interface, 1)
load_address_counter_lo(interface, 224)
interface.execute([LoadAddressCounterHi(1), LoadAddressCounterLo(224)])
regen_buffer = bytes.fromhex('e0 08 00 b1 84 95 84 91 92 84 00 a4 a5 a0 00 00 00 00 00 00 00 00 00 b7 bf 00 a1 bf 00 b1 bf 00 ac bf 00 a6 bf 00 a2 bf 00 b8 bf 00 b6 bf 00 00 09 e0')
eab_buffer = bytes.fromhex('80 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 08 00 00 10 00 00 18 00 00 20 00 00 28 00 00 30 00 00 38 00 00 00 00 00')
eab_write_alternate(interface, eab_address, eab_alternate_zip(regen_buffer, eab_buffer))
interface.execute(EABWriteAlternate(eab_address, eab_alternate_zip(regen_buffer, eab_buffer)))
# Underline EFA
load_address_counter_hi(interface, 2)
load_address_counter_lo(interface, 48)
interface.execute([LoadAddressCounterHi(2), LoadAddressCounterLo(48)])
regen_buffer = bytes.fromhex('e0 08 00 b4 8d 83 84 91 8b 88 8d 84 00 a4 a5 a0 00 00 00 00 00 00 00 b7 bf 00 a1 bf 00 b1 bf 00 ac bf 00 a6 bf 00 a2 bf 00 b8 bf 00 b6 bf 00 00 09 e0')
eab_buffer = bytes.fromhex('c0 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 08 00 00 10 00 00 18 00 00 20 00 00 28 00 00 30 00 00 38 00 00 00 00 00')
eab_write_alternate(interface, eab_address, eab_alternate_zip(regen_buffer, eab_buffer))
interface.execute(EABWriteAlternate(eab_address, eab_alternate_zip(regen_buffer, eab_buffer)))

View File

@ -5,7 +5,7 @@ from contextlib import contextmanager
sys.path.append('..')
from coax import open_serial_interface, poll, poll_ack
from coax import open_serial_interface, Poll, PollAck
DEFAULT_SERIAL_PORT = '/dev/ttyACM0'
@ -29,14 +29,14 @@ def open_example_serial_interface(reset=True, poll_flush=True):
count = 0
poll_response = poll(interface, receive_timeout=1)
poll_response = interface.execute(Poll(), timeout=1)
while poll_response:
poll_ack(interface)
interface.execute(PollAck())
count += 1
poll_response = poll(interface, receive_timeout=1)
poll_response = interface.execute(Poll(), timeout=1)
print(f'ACK\'d {count} POLL responses')

View File

@ -1,3 +1,2 @@
more-itertools==8.7.0
pyserial==3.5
sliplib==0.6.2

View File

@ -21,7 +21,7 @@ setup(
author='Andrew Kay',
author_email='projects@ajk.me',
packages=['coax'],
install_requires=['more-itertools==8.7.0', 'pyserial==3.5', 'sliplib==0.6.2'],
install_requires=['pyserial==3.5', 'sliplib==0.6.2'],
long_description=LONG_DESCRIPTION,
long_description_content_type='text/markdown',
classifiers=[

View File

@ -1,69 +1,25 @@
import unittest
from unittest.mock import Mock, call, patch
import context
from coax.features import Feature, get_features
from coax.features import Feature, read_feature_ids, parse_features
class GetFeaturesTestCase(unittest.TestCase):
def setUp(self):
self.interface = Mock()
class ReadFeatureIdsTestCase(unittest.TestCase):
def test(self):
commands = read_feature_ids()
patcher = patch('coax.features.read_feature_id')
self.assertEqual(len(commands), 14)
self.read_feature_id_mock = patcher.start()
self.assertEqual(commands[0].feature_address, 2)
self.assertEqual(commands[13].feature_address, 15)
self.addCleanup(patch.stopall)
class ParseFeaturesTestCase(unittest.TestCase):
def test(self):
commands = read_feature_ids()
def test_with_known_feature(self):
# Arrange
def read_feature_id(interface, feature_address, **kwargs):
if feature_address == 7:
return 0x79
features = parse_features([None, None, None, None, None, 0x79, None, 0x99, None, None, None, None, None, None], commands)
return None
self.read_feature_id_mock.side_effect = read_feature_id
# Act
features = get_features(self.interface)
# Assert
self.assertEqual(features, { Feature.EAB: 7 })
def test_with_unknown_feature(self):
# Arrange
def read_feature_id(interface, feature_address, **kwargs):
if feature_address == 7:
return 0x99
return None
self.read_feature_id_mock.side_effect = read_feature_id
# Act
features = get_features(self.interface)
# Assert
self.assertEqual(features, { })
def test_all_feature_addresses_are_enumerated(self):
# Act
features = get_features(self.interface)
# Assert
calls = self.read_feature_id_mock.call_args_list
self.assertEqual(calls, [call(self.interface, address) for address in range(2, 16)])
def test_receive_timeout_is_passed_to_read_feature_id(self):
# Act
features = get_features(self.interface, receive_timeout=10)
# Assert
calls = self.read_feature_id_mock.call_args_list
self.assertEqual(calls, [call(self.interface, address, receive_timeout=10) for address in range(2, 16)])
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,107 @@
import unittest
from unittest.mock import Mock
import context
from coax.interface import Interface, FrameFormat
from coax.protocol import ReadAddressCounterHi, ReadAddressCounterLo
from coax.exceptions import ReceiveTimeout
class InterfaceExecuteTestCase(unittest.TestCase):
def setUp(self):
self.interface = Interface()
self.interface._transmit_receive = Mock()
def test_single_command(self):
# Arrange
self.interface._transmit_receive.return_value=[[0b00000010_00]]
# Act
response = self.interface.execute(ReadAddressCounterHi())
# Assert
self.assertEqual(response, 0x02)
self.interface._transmit_receive.assert_called_once()
(outbound_frames, response_lengths, _) = self.interface._transmit_receive.call_args[0]
self.assertEqual(outbound_frames, [(None, (FrameFormat.WORD_DATA, 0b000_00101_01))])
self.assertEqual(response_lengths, [1])
def test_single_addressed_command(self):
# Arrange
self.interface._transmit_receive.return_value=[[0b00000010_00]]
# Act
response = self.interface.execute((0b111000, ReadAddressCounterHi()))
# Assert
self.assertEqual(response, 0x02)
self.interface._transmit_receive.assert_called_once()
(outbound_frames, response_lengths, _) = self.interface._transmit_receive.call_args[0]
self.assertEqual(outbound_frames, [(0b111000, (FrameFormat.WORD_DATA, 0b000_00101_01))])
self.assertEqual(response_lengths, [1])
def test_multiple_commands(self):
# Arrange
self.interface._transmit_receive.return_value=[[0b00000010_00], [0b11111111_00]]
# Act
response = self.interface.execute([ReadAddressCounterHi(), ReadAddressCounterLo()])
# Assert
self.assertEqual(response, [0x02, 0xff])
self.interface._transmit_receive.assert_called_once()
(outbound_frames, response_lengths, _) = self.interface._transmit_receive.call_args[0]
self.assertEqual(outbound_frames, [(None, (FrameFormat.WORD_DATA, 0b000_00101_01)), (None, (FrameFormat.WORD_DATA, 0b000_10101_01))])
self.assertEqual(response_lengths, [1, 1])
def test_multiple_addressed_commands(self):
# Arrange
self.interface._transmit_receive.return_value=[[0b00000010_00], [0b11111111_00]]
# Act
response = self.interface.execute([(0b111000, ReadAddressCounterHi()), (0b111000, ReadAddressCounterLo())])
# Assert
self.assertEqual(response, [0x02, 0xff])
self.interface._transmit_receive.assert_called_once()
(outbound_frames, response_lengths, _) = self.interface._transmit_receive.call_args[0]
self.assertEqual(outbound_frames, [(0b111000, (FrameFormat.WORD_DATA, 0b000_00101_01)), (0b111000, (FrameFormat.WORD_DATA, 0b000_10101_01))])
self.assertEqual(response_lengths, [1, 1])
def test_timeout(self):
# Arrange
self.interface._transmit_receive.return_value=[[0b00000010_00]]
# Act
response = self.interface.execute(ReadAddressCounterHi(), timeout=0.1)
# Assert
self.interface._transmit_receive.assert_called_once()
(_, _, timeout) = self.interface._transmit_receive.call_args[0]
self.assertEqual(timeout, 0.1)
def test_single_command_exception(self):
# Arrange
self.interface._transmit_receive.return_value=[ReceiveTimeout()]
# Act and assert
with self.assertRaises(ReceiveTimeout):
self.interface.execute(ReadAddressCounterHi())
if __name__ == '__main__':
unittest.main()

View File

@ -1,38 +1,10 @@
import unittest
from unittest.mock import Mock
import context
from coax import PollResponse, KeystrokePollResponse, ProtocolError
from coax.protocol import Command, Status, TerminalType, TerminalId, Control, SecondaryControl, pack_command_word, pack_data_word, unpack_data_word, pack_data_words, unpack_data_words, _execute_read_command, _execute_write_command
class PollResponseTestCase(unittest.TestCase):
def test_is_power_on_reset_complete(self):
self.assertTrue(PollResponse.is_power_on_reset_complete(0b0000001010))
def test_is_keystroke(self):
self.assertTrue(PollResponse.is_keystroke(0b1111111110))
class KeystrokePollResponseTestCase(unittest.TestCase):
def test(self):
# Act
response = KeystrokePollResponse(0b1111111110)
# Assert
self.assertEqual(response.scan_code, 0xff)
def test_not_a_keystroke(self):
with self.assertRaisesRegex(ValueError, 'Invalid keystroke poll response'):
response = KeystrokePollResponse(0b0000001000)
class StatusTestCase(unittest.TestCase):
def test(self):
status = Status(0b10000110)
self.assertTrue(status.monocase)
self.assertTrue(status.busy)
self.assertTrue(status.feature_error)
self.assertTrue(status.operation_complete)
from coax.interface import FrameFormat
from coax.protocol import PollAction, PowerOnResetCompletePollResponse, KeystrokePollResponse, TerminalId, TerminalType, Control, SecondaryControl, Poll, PollAck, ReadStatus, ReadTerminalId, ReadExtendedId, ReadAddressCounterHi, ReadAddressCounterLo, ReadData, ReadMultiple, Reset, LoadControlRegister, LoadSecondaryControl, LoadMask, LoadAddressCounterHi, LoadAddressCounterLo, WriteData, Clear, SearchForward, SearchBackward, InsertByte, StartOperation, DiagnosticReset, ReadFeatureId, pack_data_word, unpack_data_word, pack_data_words, unpack_data_words
from coax.exceptions import ProtocolError
class TerminalIdTestCase(unittest.TestCase):
def test_cut_model_2(self):
@ -104,163 +76,396 @@ class SecondaryControlTestCase(unittest.TestCase):
self.assertEqual(control.value, 0b00000001)
class PackCommandWordTestCase(unittest.TestCase):
def test(self):
self.assertEqual(pack_command_word(Command.POLL_ACK), 0b0001000101)
class PollTestCase(unittest.TestCase):
def test_pack_poll_action_none(self):
self.assertEqual(Poll(PollAction.NONE).pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_00001_01))
def test_pack_poll_action_alarm(self):
self.assertEqual(Poll(PollAction.ALARM).pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b100_00001_01))
def test_pack_poll_action_enable_keyboard_clicker(self):
self.assertEqual(Poll(PollAction.ENABLE_KEYBOARD_CLICKER).pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b110_00001_01))
def test_pack_poll_action_disable_keyboard_clicker(self):
self.assertEqual(Poll(PollAction.DISABLE_KEYBOARD_CLICKER).pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b010_00001_01))
def test_unpack_tt_ar(self):
self.assertIsNone(Poll().unpack_inbound_frame([0b0000000000]))
def test_unpack_power_on_reset_complete(self):
self.assertIsInstance(Poll().unpack_inbound_frame([0b00000010_10]), PowerOnResetCompletePollResponse)
def test_unpack_keystroke(self):
poll_response = Poll().unpack_inbound_frame([0b01010001_10])
self.assertIsInstance(poll_response, KeystrokePollResponse)
self.assertEqual(poll_response.scan_code, 0x51)
def test_unpack_invalid_response(self):
for words in [[], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
Poll().unpack_inbound_frame(words)
class PollAckTestCase(unittest.TestCase):
def test_pack(self):
self.assertEqual(PollAck().pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_10001_01))
def test_unpack_tt_ar(self):
self.assertIsNone(PollAck().unpack_inbound_frame([0b0000000000]))
def test_unpack_invalid_response(self):
for words in [[], [0b1010101011], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
PollAck().unpack_inbound_frame(words)
class ReadStatusTestCase(unittest.TestCase):
def test_pack(self):
self.assertEqual(ReadStatus().pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_01101_01))
def test_unpack_all_false(self):
status = ReadStatus().unpack_inbound_frame([0b00100000_00])
self.assertFalse(status.monocase)
self.assertFalse(status.busy)
self.assertFalse(status.feature_error)
self.assertFalse(status.operation_complete)
def test_unpack_all_true(self):
status = ReadStatus().unpack_inbound_frame([0b10000110_00])
self.assertTrue(status.monocase)
self.assertTrue(status.busy)
self.assertTrue(status.feature_error)
self.assertTrue(status.operation_complete)
def test_unpack_invalid_response(self):
for words in [[], [0b1010101011], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
ReadStatus().unpack_inbound_frame(words)
class ReadTerminalIdTestCase(unittest.TestCase):
def test_pack(self):
self.assertEqual(ReadTerminalId().pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_01001_01))
def test_unpack_cut_terminal(self):
terminal_id = ReadTerminalId().unpack_inbound_frame([0b1111_010_0_00])
self.assertEqual(terminal_id.type, TerminalType.CUT)
self.assertEqual(terminal_id.model, 2);
self.assertEqual(terminal_id.keyboard, 15)
def test_unpack_dft_terminal(self):
terminal_id = ReadTerminalId().unpack_inbound_frame([0b0000_000_1_00])
self.assertEqual(terminal_id.type, TerminalType.DFT)
self.assertIsNone(terminal_id.model)
self.assertIsNone(terminal_id.keyboard)
def test_unpack_invalid_response(self):
for words in [[], [0b1010101011], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
ReadTerminalId().unpack_inbound_frame(words)
class ReadExtendedIdTestCase(unittest.TestCase):
def test_pack(self):
self.assertEqual(ReadExtendedId().pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_00111_01))
def test_unpack_extended_id(self):
self.assertEqual(ReadExtendedId().unpack_inbound_frame([0b11000001_00, 0b00110100_00, 0b10000011_00, 0b00000000_00]), bytes.fromhex('c1 34 83 00'))
def test_unpack_invalid_response(self):
for words in [[], [0b11000001_00, 0b00110100_00, 0b10000011_01, 0b00000000_00], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
ReadExtendedId().unpack_inbound_frame(words)
class ReadAddressCounterHiTestCase(unittest.TestCase):
def test_pack(self):
self.assertEqual(ReadAddressCounterHi().pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_00101_01))
def test_unpack_address(self):
self.assertEqual(ReadAddressCounterHi().unpack_inbound_frame([0b00000010_00]), 0x02)
def test_unpack_invalid_response(self):
for words in [[], [0b00000010_01], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
ReadAddressCounterHi().unpack_inbound_frame(words)
class ReadAddressCounterLoTestCase(unittest.TestCase):
def test_pack(self):
self.assertEqual(ReadAddressCounterLo().pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_10101_01))
def test_unpack_address(self):
self.assertEqual(ReadAddressCounterLo().unpack_inbound_frame([0b11111111_00]), 0xff)
def test_unpack_invalid_response(self):
for words in [[], [0b11111111_01], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
ReadAddressCounterLo().unpack_inbound_frame(words)
class ReadDataTestCase(unittest.TestCase):
def test_pack(self):
self.assertEqual(ReadData().pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_00011_01))
def test_unpack_data(self):
self.assertEqual(ReadData().unpack_inbound_frame([0b11111111_00]), 0xff)
def test_unpack_invalid_response(self):
for words in [[], [0b11111111_01], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
ReadData().unpack_inbound_frame(words)
class ReadMultipleTestCase(unittest.TestCase):
def test_pack(self):
self.assertEqual(ReadMultiple().pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_01011_01))
def test_unpack_data(self):
self.assertEqual(ReadMultiple().unpack_inbound_frame([0b00000000_00, 0b11111111_00]), bytes.fromhex('00 ff'))
def test_unpack_invalid_response(self):
for words in [[], [0b11111111_01]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
ReadMultiple().unpack_inbound_frame(words)
class ResetTestCase(unittest.TestCase):
def test_pack(self):
self.assertEqual(Reset().pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_00010_01))
def test_unpack_tt_ar(self):
self.assertIsNone(Reset().unpack_inbound_frame([0b0000000000]))
def test_unpack_invalid_response(self):
for words in [[], [0b0011000000], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
Reset().unpack_inbound_frame(words)
class LoadControlRegisterTestCase(unittest.TestCase):
def test_pack_all_false(self):
control = Control(False, False, False, False, False)
self.assertEqual(LoadControlRegister(control).pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_01010_01, [0b00000000]))
def test_pack_all_true(self):
control = Control(True, True, True, True, True)
self.assertEqual(LoadControlRegister(control).pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_01010_01, [0b00011111]))
def test_unpack_tt_ar(self):
control = Control()
self.assertIsNone(LoadControlRegister(control).unpack_inbound_frame([0b0000000000]))
def test_unpack_invalid_response(self):
control = Control()
for words in [[], [0b0011000000], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
LoadControlRegister(control).unpack_inbound_frame(words)
class LoadSecondaryControlTestCase(unittest.TestCase):
def test_pack_all_false(self):
control = SecondaryControl(False)
self.assertEqual(LoadSecondaryControl(control).pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_11010_01, [0b00000000]))
def test_pack_all_true(self):
control = SecondaryControl(True)
self.assertEqual(LoadSecondaryControl(control).pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_11010_01, [0b00000001]))
def test_unpack_tt_ar(self):
control = SecondaryControl()
self.assertIsNone(LoadSecondaryControl(control).unpack_inbound_frame([0b0000000000]))
def test_unpack_invalid_response(self):
control = SecondaryControl()
for words in [[], [0b0011000000], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
LoadSecondaryControl(control).unpack_inbound_frame(words)
class LoadMaskTestCase(unittest.TestCase):
def test_pack(self):
self.assertEqual(LoadMask(0xff).pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_10110_01, [0xff]))
def test_unpack_tt_ar(self):
self.assertIsNone(LoadMask(0xff).unpack_inbound_frame([0b0000000000]))
def test_unpack_invalid_response(self):
for words in [[], [0b0011000000], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
LoadMask(0xff).unpack_inbound_frame(words)
class LoadAddressCounterHiTestCase(unittest.TestCase):
def test_pack(self):
self.assertEqual(LoadAddressCounterHi(0xff).pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_00100_01, [0xff]))
def test_unpack_tt_ar(self):
self.assertIsNone(LoadAddressCounterHi(0xff).unpack_inbound_frame([0b0000000000]))
def test_unpack_invalid_response(self):
for words in [[], [0b0011000000], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
LoadAddressCounterHi(0xff).unpack_inbound_frame(words)
class LoadAddressCounterLoTestCase(unittest.TestCase):
def test_pack(self):
self.assertEqual(LoadAddressCounterLo(0xff).pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_10100_01, [0xff]))
def test_unpack_tt_ar(self):
self.assertIsNone(LoadAddressCounterLo(0xff).unpack_inbound_frame([0b0000000000]))
def test_unpack_invalid_response(self):
for words in [[], [0b0011000000], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
LoadAddressCounterLo(0xff).unpack_inbound_frame(words)
class WriteDataTestCase(unittest.TestCase):
def test_pack(self):
self.assertEqual(WriteData(bytes.fromhex('00 ff')).pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_01100_01, bytes.fromhex('00 ff')))
def test_unpack_tt_ar(self):
self.assertIsNone(WriteData(bytes.fromhex('00 ff')).unpack_inbound_frame([0b0000000000]))
def test_unpack_invalid_response(self):
for words in [[], [0b0011000000], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
WriteData(bytes.fromhex('00 ff')).unpack_inbound_frame(words)
class ClearTestCase(unittest.TestCase):
def test_pack(self):
self.assertEqual(Clear(0xff).pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_00110_01, [0xff]))
def test_unpack_tt_ar(self):
self.assertIsNone(Clear(0xff).unpack_inbound_frame([0b0000000000]))
def test_unpack_invalid_response(self):
for words in [[], [0b0011000000], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
Clear(0xff).unpack_inbound_frame(words)
class SearchForwardTestCase(unittest.TestCase):
def test_pack(self):
self.assertEqual(SearchForward(0xff).pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_10000_01, [0xff]))
def test_unpack_tt_ar(self):
self.assertIsNone(SearchForward(0xff).unpack_inbound_frame([0b0000000000]))
def test_unpack_invalid_response(self):
for words in [[], [0b0011000000], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
SearchForward(0xff).unpack_inbound_frame(words)
class SearchBackwardTestCase(unittest.TestCase):
def test_pack(self):
self.assertEqual(SearchBackward(0xff).pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_10010_01, [0xff]))
def test_unpack_tt_ar(self):
self.assertIsNone(SearchBackward(0xff).unpack_inbound_frame([0b0000000000]))
def test_unpack_invalid_response(self):
for words in [[], [0b0011000000], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
SearchBackward(0xff).unpack_inbound_frame(words)
class InsertByteTestCase(unittest.TestCase):
def test_pack(self):
self.assertEqual(InsertByte(0xff).pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b000_01110_01, [0xff]))
def test_unpack_tt_ar(self):
self.assertIsNone(InsertByte(0xff).unpack_inbound_frame([0b0000000000]))
def test_unpack_invalid_response(self):
for words in [[], [0b0011000000], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
InsertByte(0xff).unpack_inbound_frame(words)
class StartOperationTestCase(unittest.TestCase):
def test_pack(self):
with self.assertRaises(NotImplementedError):
StartOperation().pack_outbound_frame()
def test_unpack(self):
with self.assertRaises(NotImplementedError):
StartOperation().unpack_inbound_frame([])
class DiagnosticResetTestCase(unittest.TestCase):
def test_pack(self):
with self.assertRaises(NotImplementedError):
DiagnosticReset().pack_outbound_frame()
def test_unpack(self):
with self.assertRaises(NotImplementedError):
DiagnosticReset().unpack_inbound_frame([])
class ReadFeatureIdTestCase(unittest.TestCase):
def test_pack(self):
self.assertEqual(ReadFeatureId(7).pack_outbound_frame(), (FrameFormat.WORD_DATA, 0b0111_0111_01))
def test_unpack_tt_ar(self):
self.assertIsNone(ReadFeatureId(7).unpack_inbound_frame([0b0000000000]))
def test_unpack_id(self):
self.assertEqual(ReadFeatureId(7).unpack_inbound_frame([0b01111001_00]), 0x79)
def test_unpack_invalid_response(self):
for words in [[], [1, 2]]:
with self.subTest(words=words):
with self.assertRaises(ProtocolError):
ReadFeatureId(7).unpack_inbound_frame(words)
class PackDataWordTestCase(unittest.TestCase):
def test(self):
self.assertEqual(pack_data_word(0x00), 0b0000000010)
self.assertEqual(pack_data_word(0x01), 0b0000000100)
self.assertEqual(pack_data_word(0xff), 0b1111111110)
def test_set_parity(self):
self.assertEqual(pack_data_word(0x00, set_parity=True), 0b00000000_10)
self.assertEqual(pack_data_word(0x01, set_parity=True), 0b00000001_00)
self.assertEqual(pack_data_word(0xff, set_parity=True), 0b11111111_10)
def test_disable_set_parity(self):
self.assertEqual(pack_data_word(0x00, set_parity=False), 0b0000000000)
self.assertEqual(pack_data_word(0x01, set_parity=False), 0b0000000100)
self.assertEqual(pack_data_word(0xff, set_parity=False), 0b1111111100)
def test_do_not_set_parity(self):
self.assertEqual(pack_data_word(0x00, set_parity=False), 0b00000000_00)
self.assertEqual(pack_data_word(0x01, set_parity=False), 0b00000001_00)
self.assertEqual(pack_data_word(0xff, set_parity=False), 0b11111111_00)
class UnpackDataWordTestCase(unittest.TestCase):
def test(self):
self.assertEqual(unpack_data_word(0b0000000010), 0x00)
self.assertEqual(unpack_data_word(0b1111111110), 0xff)
def test_do_not_check_parity(self):
self.assertEqual(unpack_data_word(0b00000000_10, check_parity=True), 0x00)
self.assertEqual(unpack_data_word(0b11111111_10, check_parity=True), 0xff)
def test_data_bit_not_set_error(self):
with self.assertRaisesRegex(ProtocolError, 'Word does not have data bit set'):
unpack_data_word(0b0000000011)
unpack_data_word(0b00000000_11)
def test_parity_error(self):
with self.assertRaisesRegex(ProtocolError, 'Parity error'):
unpack_data_word(0b0000000000, check_parity=True)
unpack_data_word(0b00000000_00, check_parity=True)
class PackDataWordsTestCase(unittest.TestCase):
def test(self):
self.assertEqual(pack_data_words(bytes.fromhex('00 ff')), [0b0000000010, 0b1111111110])
self.assertEqual(pack_data_words(bytes.fromhex('00 ff')), [0b00000000_10, 0b11111111_10])
class UnpackDataWordsTestCase(unittest.TestCase):
def test(self):
self.assertEqual(unpack_data_words([0b0000000010, 0b1111111110]), bytes.fromhex('00 ff'))
class ExecuteReadCommandTestCase(unittest.TestCase):
def setUp(self):
self.interface = Mock()
def test(self):
# Arrange
command_word = pack_command_word(Command.READ_TERMINAL_ID)
self.interface.transmit_receive = Mock(return_value=[0b0000000010])
# Act and assert
self.assertEqual(_execute_read_command(self.interface, command_word), bytes.fromhex('00'))
def test_allow_trta_response(self):
# Arrange
command_word = pack_command_word(Command.POLL)
self.interface.transmit_receive = Mock(return_value=[0b0000000000])
# Act and assert
self.assertEqual(_execute_read_command(self.interface, command_word, allow_trta_response=True, trta_value='TRTA'), 'TRTA')
def test_disable_unpack(self):
# Arrange
command_word = pack_command_word(Command.POLL)
self.interface.transmit_receive = Mock(return_value=[0b1111111110])
# Act and assert
self.assertEqual(_execute_read_command(self.interface, command_word, unpack=False), [0b1111111110])
def test_unexpected_response_length(self):
# Arrange
command_word = pack_command_word(Command.READ_TERMINAL_ID)
self.interface.transmit_receive = Mock(return_value=[])
# Act and assert
with self.assertRaisesRegex(ProtocolError, 'Expected 1 word response'):
_execute_read_command(self.interface, command_word)
def test_receive_timeout_is_passed_to_interface(self):
# Arrange
command_word = pack_command_word(Command.READ_TERMINAL_ID)
self.interface.transmit_receive = Mock(return_value=[0b0000000010])
# Act
_execute_read_command(self.interface, command_word, receive_timeout=10)
# Assert
self.assertEqual(self.interface.transmit_receive.call_args[1].get('receive_timeout'), 10)
class ExecuteWriteCommandTestCase(unittest.TestCase):
def setUp(self):
self.interface = Mock()
def test(self):
for jumbo_write_strategy in [None, 'split']:
with self.subTest(jumbo_write_strategy=jumbo_write_strategy):
# Arrange
command_word = pack_command_word(Command.WRITE_DATA)
self.interface.transmit_receive = Mock(return_value=[0b0000000000])
# Act
_execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef'), jumbo_write_strategy=jumbo_write_strategy)
# Assert
self.interface.transmit_receive.assert_called_once_with([0x0031, 0x037a, 0x02b4, 0x02fa, 0x03bc], None, receive_length=1)
def test_jumbo_write_split_strategy(self):
# Arrange
command_word = pack_command_word(Command.WRITE_DATA)
self.interface.transmit_receive = Mock(return_value=[0b0000000000])
data = (bytes.fromhex('01') * 1023) + (bytes.fromhex('02') * 32)
# Act
_execute_write_command(self.interface, command_word, data, jumbo_write_strategy='split')
# Assert
self.assertEqual(self.interface.transmit_receive.call_count, 2)
call_args_list = self.interface.transmit_receive.call_args_list
self.assertEqual(len(call_args_list[0][0][0]), 1024)
self.assertEqual(len(call_args_list[1][0][0]), 32)
def test_unexpected_response_length(self):
# Arrange
command_word = pack_command_word(Command.WRITE_DATA)
self.interface.transmit_receive = Mock(return_value=[])
# Act and assert
with self.assertRaisesRegex(ProtocolError, 'Expected 1 word response'):
_execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef'))
def test_not_trta_response(self):
# Arrange
command_word = pack_command_word(Command.WRITE_DATA)
self.interface.transmit_receive = Mock(return_value=[0b0000000010])
# Act and assert
with self.assertRaisesRegex(ProtocolError, 'Expected TR/TA response'):
_execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef'))
def test_receive_timeout_is_passed_to_interface(self):
# Arrange
command_word = pack_command_word(Command.WRITE_DATA)
self.interface.transmit_receive = Mock(return_value=[0b0000000000])
# Assert
_execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef'), receive_timeout=10)
# Assert
self.assertEqual(self.interface.transmit_receive.call_args[1].get('receive_timeout'), 10)
self.assertEqual(unpack_data_words([0b00000000_10, 0b11111111_10]), bytes.fromhex('00 ff'))
if __name__ == '__main__':
unittest.main()

View File

@ -1,10 +1,12 @@
import unittest
from unittest.mock import Mock
from unittest.mock import Mock, call
import sliplib
import context
from coax import SerialInterface, InterfaceError, ReceiveError, ReceiveTimeout
from coax.interface import FrameFormat
from coax.serial_interface import SerialInterface
from coax.exceptions import InterfaceError, ReceiveTimeout
class SerialInterfaceResetTestCase(unittest.TestCase):
def setUp(self):
@ -34,7 +36,7 @@ class SerialInterfaceResetTestCase(unittest.TestCase):
def test_legacy_response_is_handled_correctly(self):
# Arrange
self.interface._read_message = Mock(return_value=bytes.fromhex('01 01 02 03'))
self.interface._read_message.return_value=bytes.fromhex('01 01 02 03')
# Act
self.interface.reset()
@ -55,7 +57,7 @@ class SerialInterfaceResetTestCase(unittest.TestCase):
def test_invalid_message_length_is_handled_correctly(self):
# Arrange
self.interface._read_message = Mock(return_value=bytes.fromhex('01 01'))
self.interface._read_message.return_value=bytes.fromhex('01 01')
# Act and assert
with self.assertRaisesRegex(InterfaceError, 'Invalid reset response'):
@ -63,7 +65,7 @@ class SerialInterfaceResetTestCase(unittest.TestCase):
def test_error_is_handled_correctly(self):
# Arrange
self.interface._read_message = Mock(return_value=bytes.fromhex('02 01'))
self.interface._read_message.return_value=bytes.fromhex('02 01')
# Act and assert
with self.assertRaisesRegex(InterfaceError, 'Invalid request message'):
@ -71,13 +73,138 @@ class SerialInterfaceResetTestCase(unittest.TestCase):
def test_error_with_description_is_handled_correctly(self):
# Arrange
self.interface._read_message = Mock(return_value=bytes.fromhex('02 01 45 72 72 6f 72 20 64 65 73 63 72 69 70 74 69 6f 6e'))
self.interface._read_message.return_value=bytes.fromhex('02 01 45 72 72 6f 72 20 64 65 73 63 72 69 70 74 69 6f 6e')
# Act and assert
with self.assertRaisesRegex(InterfaceError, 'Invalid request message: Error description'):
self.interface.reset()
# TODO...
class SerialInterfaceTransmitReceiveTestCase(unittest.TestCase):
def setUp(self):
self.serial = Mock()
self.serial.timeout = None
self.interface = SerialInterface(self.serial)
self.interface._write_message = Mock()
self.interface._read_message = Mock()
def test_words_frame(self):
# Arrange
self.interface._read_message.return_value=bytes.fromhex('01 00 00')
# Act
responses = self.interface._transmit_receive([(None, (FrameFormat.WORDS, [0b1111111111, 0b0000000000]))], [1], None)
# Assert
self.assertEqual(responses, [[0]])
self.interface._write_message.assert_called_with(bytes.fromhex('06 00 00 ff 03 00 00 00 01 00 00'))
def test_words_repeat_frame(self):
# Arrange
self.interface._read_message.return_value=bytes.fromhex('01 00 00')
# Act
responses = self.interface._transmit_receive([(None, (FrameFormat.WORDS, ([0b1111111111, 0b0000000000], 2)))], [1], None)
# Assert
self.assertEqual(responses, [[0]])
self.interface._write_message.assert_called_with(bytes.fromhex('06 00 02 ff 03 00 00 00 01 00 00'))
def test_word_data_frame(self):
# Arrange
self.interface._read_message.return_value=bytes.fromhex('01 00 00')
# Act
responses = self.interface._transmit_receive([(None, (FrameFormat.WORD_DATA, 0b1111111111, [0x00, 0xff]))], [1], None)
# Assert
self.assertEqual(responses, [[0]])
self.interface._write_message.assert_called_with(bytes.fromhex('06 00 00 ff 03 02 00 fe 03 00 01 00 00'))
def test_word_data_repeat_frame(self):
# Arrange
self.interface._read_message.return_value=bytes.fromhex('01 00 00')
# Act
responses = self.interface._transmit_receive([(None, (FrameFormat.WORD_DATA, 0b1111111111, ([0x00, 0xff], 2)))], [1], None)
# Assert
self.assertEqual(responses, [[0]])
self.interface._write_message.assert_called_with(bytes.fromhex('06 80 02 ff 03 02 00 fe 03 00 01 00 00'))
def test_data_frame(self):
# Arrange
self.interface._read_message.return_value=bytes.fromhex('01 00 00')
# Act
responses = self.interface._transmit_receive([(None, (FrameFormat.DATA, [0x00, 0xff]))], [1], None)
# Assert
self.assertEqual(responses, [[0]])
self.interface._write_message.assert_called_with(bytes.fromhex('06 00 00 02 00 fe 03 00 01 00 00'))
def test_data_repeat_frame(self):
# Arrange
self.interface._read_message.return_value=bytes.fromhex('01 00 00')
# Act
responses = self.interface._transmit_receive([(None, (FrameFormat.DATA, ([0x00, 0xff], 2)))], [1], None)
# Assert
self.assertEqual(responses, [[0]])
self.interface._write_message.assert_called_with(bytes.fromhex('06 00 02 02 00 fe 03 00 01 00 00'))
def test_receive_timeout_error(self):
# Arrange
self.interface._read_message.return_value=bytes.fromhex('02 66')
# Act
responses = self.interface._transmit_receive([(None, (FrameFormat.WORD_DATA, 0b1111111111, [0x00, 0xff]))], [1], 0.5)
# Assert
self.assertIsInstance(responses[0], ReceiveTimeout)
def test_interface_error(self):
# Arrange
self.interface._read_message.return_value=bytes.fromhex('02 65')
# Act and assert
with self.assertRaises(InterfaceError):
self.interface._transmit_receive([(None, (FrameFormat.WORD_DATA, 0b1111111111, [0x00, 0xff]))], [1], 0.5)
def test_timeout(self):
# Arrange
self.interface._read_message.return_value=bytes.fromhex('01 00 00')
# Act
self.interface._transmit_receive([(None, (FrameFormat.WORD_DATA, 0b1111111111, [0x00, 0xff]))], [1], 0.5)
# Assert
self.interface._write_message.assert_called_with(bytes.fromhex('06 00 00 ff 03 02 00 fe 03 00 01 01 f4'))
def test_addressed_frame(self):
with self.assertRaises(NotImplementedError):
self.interface._transmit_receive([(0b111000, (FrameFormat.WORD_DATA, 0b1111111111, [0x00, 0xff]))], [1], 0.5)
def test_multiple_frames(self):
# Arrange
self.interface._read_message.return_value=bytes.fromhex('01 00 00')
# Act
responses = self.interface._transmit_receive([(None, (FrameFormat.WORDS, [0b1111111111, 0b0000000000])), (None, (FrameFormat.WORD_DATA, 0b1111111111, [0x00, 0xff]))], [1, 1], None)
# Assert
self.assertEqual(responses, [[0], [0]])
self.interface._write_message.assert_has_calls([call(bytes.fromhex('06 00 00 ff 03 00 00 00 01 00 00')), call(bytes.fromhex('06 00 00 ff 03 02 00 fe 03 00 01 00 00'))])
class SerialInterfaceReadMessageTestCase(unittest.TestCase):
def setUp(self):
@ -89,7 +216,7 @@ class SerialInterfaceReadMessageTestCase(unittest.TestCase):
def test(self):
# Arrange
self.interface.slip_serial.recv_msg = Mock(return_value=bytes.fromhex('00 04 01 02 03 04 00 00'))
self.interface.slip_serial.recv_msg.return_value=bytes.fromhex('00 04 01 02 03 04 00 00')
# Act
message = self.interface._read_message()
@ -99,7 +226,7 @@ class SerialInterfaceReadMessageTestCase(unittest.TestCase):
def test_protocol_error_is_handled_correctly(self):
# Arrange
self.interface.slip_serial.recv_msg = Mock(side_effect=sliplib.ProtocolError)
self.interface.slip_serial.recv_msg.side_effect=sliplib.ProtocolError
# Act and assert
with self.assertRaisesRegex(InterfaceError, 'SLIP protocol error'):
@ -107,7 +234,7 @@ class SerialInterfaceReadMessageTestCase(unittest.TestCase):
def test_invalid_message_length_is_handled_correctly(self):
# Arrange
self.interface.slip_serial.recv_msg = Mock(return_value=bytes.fromhex('00'))
self.interface.slip_serial.recv_msg.return_value=bytes.fromhex('00')
# Act and assert
with self.assertRaisesRegex(InterfaceError, 'Invalid response message'):
@ -115,7 +242,7 @@ class SerialInterfaceReadMessageTestCase(unittest.TestCase):
def test_message_length_mismatch_is_handled_correctly(self):
# Arrange
self.interface.slip_serial.recv_msg = Mock(return_value=bytes.fromhex('00 05 01 02 03 04 00 00'))
self.interface.slip_serial.recv_msg.return_value=bytes.fromhex('00 05 01 02 03 04 00 00')
# Act and assert
with self.assertRaisesRegex(InterfaceError, 'Response message length mismatch'):
@ -123,7 +250,7 @@ class SerialInterfaceReadMessageTestCase(unittest.TestCase):
def test_empty_message_is_handled_correctly(self):
# Arrange
self.interface.slip_serial.recv_msg = Mock(return_value=bytes.fromhex('00 00 00 00'))
self.interface.slip_serial.recv_msg.return_value=bytes.fromhex('00 00 00 00')
# Act and assert
with self.assertRaisesRegex(InterfaceError, 'Empty response message'):