Use new pycoax 0.10.0 API

This commit is contained in:
Andrew Kay
2021-10-23 18:42:58 -05:00
parent e98569535b
commit b971e9700c
13 changed files with 712 additions and 543 deletions

View File

@@ -4,6 +4,7 @@ import logging
import argparse
from coax import open_serial_interface
from .interface import InterfaceWrapper
from .controller import Controller
from .tn3270 import TN3270Session
@@ -87,7 +88,7 @@ def main():
with open_serial_interface(args.serial_port) as interface:
create_session = lambda terminal: _create_session(args, terminal)
controller = Controller(interface, _get_keymap, create_session)
controller = Controller(InterfaceWrapper(interface), _get_keymap, create_session)
print('Starting controller...')

View File

@@ -6,9 +6,10 @@ oec.controller
import time
import logging
import selectors
from coax import poll, poll_ack, KeystrokePollResponse, ReceiveTimeout, \
from coax import Poll, PollAck, KeystrokePollResponse, ReceiveTimeout, \
ReceiveError, ProtocolError
from .interface import address_commands
from .terminal import create_terminal, UnsupportedTerminalError
from .keyboard import Key
from .session import SessionDisconnectedError
@@ -64,6 +65,8 @@ class Controller:
self.running = False
def _run_loop(self):
device_address = None
poll_delay = self._calculate_poll_delay(time.perf_counter())
# If POLLing is delayed, handle the host output, otherwise just sleep.
@@ -77,7 +80,7 @@ class Controller:
time.sleep(poll_delay)
try:
poll_response = self._poll()
poll_response = self._poll(device_address)
except ReceiveTimeout:
if self.terminal:
self._handle_terminal_detached()
@@ -92,7 +95,7 @@ class Controller:
if not self.terminal:
try:
self._handle_terminal_attached(poll_response)
self._handle_terminal_attached(device_address, poll_response)
except UnsupportedTerminalError as error:
self.logger.error(f'Unsupported terminal: {error}')
return
@@ -100,10 +103,11 @@ class Controller:
if poll_response:
self._handle_poll_response(poll_response)
def _handle_terminal_attached(self, poll_response):
def _handle_terminal_attached(self, device_address, poll_response):
self.logger.info('Terminal attached')
self.terminal = create_terminal(self.interface, poll_response, self.get_keymap)
self.terminal = create_terminal(self.interface, device_address, poll_response,
self.get_keymap)
self.terminal.setup()
@@ -198,21 +202,19 @@ class Controller:
self.session.render()
def _poll(self):
def _poll(self, device_address):
self.last_poll_time = time.perf_counter()
# If a terminal is connected, use the terminal method to ensure that
# any queued POLL action is applied.
if self.terminal:
poll_response = self.terminal.poll(receive_timeout=1)
poll_response = self.terminal.poll()
else:
poll_response = poll(self.interface, receive_timeout=1)
poll_response = self.interface.execute(address_commands(device_address, Poll()))
if poll_response:
try:
poll_ack(self.interface)
except ReceiveError as error:
self.logger.warning(f'POLL_ACK receive error: {error}', exc_info=error)
self.interface.execute(address_commands(device_address, PollAck()))
except ProtocolError as error:
self.logger.warning(f'POLL_ACK protocol error: {error}', exc_info=error)

View File

@@ -8,21 +8,19 @@ from itertools import zip_longest
import logging
from more_itertools import interleave
from sortedcontainers import SortedSet
from coax import read_address_counter_hi, read_address_counter_lo, \
load_address_counter_hi, load_address_counter_lo, write_data, \
eab_load_mask, eab_write_alternate
from coax import ReadAddressCounterHi, ReadAddressCounterLo, LoadAddressCounterHi, \
LoadAddressCounterLo, WriteData, EABLoadMask, EABWriteAlternate, Data
# Does not include the status line row.
Dimensions = namedtuple('Dimensions', ['rows', 'columns'])
class Display:
def __init__(self, terminal, dimensions, eab_address, jumbo_write_strategy=None):
def __init__(self, terminal, dimensions, eab_address):
self.logger = logging.getLogger(__name__)
self.terminal = terminal
self.dimensions = dimensions
self.eab_address = eab_address
self.jumbo_write_strategy = jumbo_write_strategy
self.address_counter = None
self.last_address = ((dimensions.rows + 1) * dimensions.columns) - 1
@@ -109,7 +107,7 @@ class Display:
if not self.has_eab:
raise RuntimeError('No EAB feature')
eab_load_mask(self.terminal.interface, self.eab_address, mask)
self.terminal.execute(EABLoadMask(self.eab_address, mask))
def toggle_cursor_blink(self):
self.terminal.control.cursor_blink = not self.terminal.control.cursor_blink
@@ -149,8 +147,7 @@ class Display:
return address
def _read_address_counter(self):
hi = read_address_counter_hi(self.terminal.interface)
lo = read_address_counter_lo(self.terminal.interface)
[hi, lo] = self.terminal.execute([ReadAddressCounterHi(), ReadAddressCounterLo()])
self.address_counter = (hi << 8) | lo
@@ -163,21 +160,41 @@ class Display:
(hi, lo) = _split_address(address)
(current_hi, current_lo) = _split_address(self.address_counter)
commands = []
if hi != current_hi or force_load:
load_address_counter_hi(self.terminal.interface, hi)
commands.append(LoadAddressCounterHi(hi))
if lo != current_lo or force_load:
load_address_counter_lo(self.terminal.interface, lo)
commands.append(LoadAddressCounterLo(lo))
self.terminal.execute(commands)
self.address_counter = address
return True
def _write_data(self, data):
write_data(self.terminal.interface, data, jumbo_write_strategy=self.jumbo_write_strategy)
chunks = self.terminal.interface.jumbo_write_split_data(data, -1)
commands = [WriteData(chunks[0])]
for chunk in chunks[1:]:
commands.append(Data(chunk))
self.terminal.execute(WriteData(data))
def _eab_write_alternate(self, data):
eab_write_alternate(self.terminal.interface, self.eab_address, data, jumbo_write_strategy=self.jumbo_write_strategy)
# The EAB_WRITE_ALTERNATE command data must be split so that the two bytes
# do not get separated, otherwise the write will be incorrect.
chunks = self.terminal.interface.jumbo_write_split_data(data, -2)
commands = [EABWriteAlternate(self.eab_address, chunks[0])]
for chunk in chunks[1:]:
commands.append(Data(chunk))
self.terminal.execute(commands)
def _split_address(address):
if address is None:
@@ -220,8 +237,8 @@ class StatusLine:
self.write(45, indicators)
class BufferedDisplay(Display):
def __init__(self, terminal, dimensions, eab_address, jumbo_write_strategy=None):
super().__init__(terminal, dimensions, eab_address, jumbo_write_strategy)
def __init__(self, terminal, dimensions, eab_address):
super().__init__(terminal, dimensions, eab_address)
length = (self.dimensions.rows + 1) * self.dimensions.columns

116
oec/interface.py Normal file
View File

@@ -0,0 +1,116 @@
"""
oec.interface
~~~~~~~~~~~~~
"""
import os
import logging
from textwrap import dedent
from more_itertools import chunked
logger = logging.getLogger(__name__)
class AggregateExecuteError(Exception):
def __init__(self, errors, responses):
super().__init__('One or more errors occurred')
self.errors = errors
self.responses = responses
class InterfaceWrapper:
def __init__(self, interface):
self.interface = interface
self.timeout = 0.1
self.jumbo_write_strategy = _get_jumbo_write_strategy()
self.jumbo_write_max_length = None
if self.legacy_firmware_detected and self.jumbo_write_strategy is None:
_print_i1_jumbo_write_notice()
self.jumbo_write_strategy = 'split'
if self.jumbo_write_strategy == 'split':
self.jumbo_write_max_length = 1024
def __getattr__(self, attr):
return getattr(self.interface, attr)
def execute(self, commands):
if not isinstance(commands, list):
return self.interface.execute(commands, self.timeout)
responses = self.interface.execute(commands, self.timeout)
errors = [response for response in responses if isinstance(response, BaseException)]
if any(errors):
raise AggregateExecuteError(errors, responses)
return responses
def jumbo_write_split_data(self, data, first_chunk_max_length_adjustment=-1):
if self.jumbo_write_strategy != 'split':
return [data]
if isinstance(data, tuple):
length = len(data[0]) * data[1]
else:
length = len(data)
first_chunk_max_length = self.jumbo_write_max_length + first_chunk_max_length_adjustment
if length <= first_chunk_max_length:
return [data]
if isinstance(data, tuple):
data = data[0] * data[1]
return [data[:first_chunk_max_length], *chunked(data[first_chunk_max_length:], self.jumbo_write_max_length)]
def address_commands(device_address, commands):
if isinstance(commands, list):
return [(device_address, command) for command in commands]
return (device_address, commands)
def _get_jumbo_write_strategy():
value = os.environ.get('COAX_JUMBO')
if value is None:
return None
if value in ['split', 'ignore']:
return value
logger.warning(f'Unsupported COAX_JUMBO option: {value}')
return None
def _print_i1_jumbo_write_notice():
notice = '''
**** **** **** **** **** **** **** **** **** **** **** **** **** **** **** ****
I think you are using an older firmware on the 1st generation, Arduino Mega
based, interface which does not support the "jumbo write" required to write a
full screen to the regen and EAB buffers.
I'm going to split large writes into multiple smaller 1024-byte writes...
If you want to override this behavior, you can set the COAX_JUMBO environment
variable as follows:
- COAX_JUMBO=split - split large writes into multiple smaller 32-byte writes
before sending to the interface, this will result in
additional round trips to the interface which may
manifest as visible incremental changes being applied
to the screen
- COAX_JUMBO=ignore - try a jumbo write, anyway, use this option if you
believe you are seeing this behavior in error
**** **** **** **** **** **** **** **** **** **** **** **** **** **** **** ****
'''
print(dedent(notice))

View File

@@ -3,14 +3,13 @@ oec.terminal
~~~~~~~~~~~~
"""
import os
import time
import logging
from textwrap import dedent
from coax import poll, read_terminal_id, read_extended_id, get_features, \
load_control_register, TerminalType, Feature, PollAction, Control, \
ReceiveError, ProtocolError
from coax import read_feature_ids, parse_features, Poll, ReadTerminalId, ReadExtendedId, \
LoadControlRegister, TerminalType, Feature, PollAction, Control, \
ProtocolError
from .interface import address_commands
from .display import Dimensions, BufferedDisplay
from .keyboard import Keyboard
@@ -26,9 +25,10 @@ MODEL_DIMENSIONS = {
class Terminal:
"""The terminal."""
def __init__(self, interface, terminal_id, extended_id, dimensions, features,
keymap, jumbo_write_strategy=None):
def __init__(self, interface, device_address, terminal_id, extended_id, dimensions,
features, keymap):
self.interface = interface
self.device_address = device_address
self.terminal_id = terminal_id
self.extended_id = extended_id
self.features = features
@@ -37,8 +37,7 @@ class Terminal:
cursor_inhibit=False, cursor_reverse=False,
cursor_blink=False)
self.display = BufferedDisplay(self, dimensions, features.get(Feature.EAB),
jumbo_write_strategy=jumbo_write_strategy)
self.display = BufferedDisplay(self, dimensions, features.get(Feature.EAB))
self.keyboard = Keyboard(keymap)
self.alarm = False
@@ -53,7 +52,7 @@ class Terminal:
self.display.clear(clear_status_line=True)
def poll(self, **kwargs):
def poll(self):
"""Execute a POLL command with queued actions."""
poll_action = PollAction.NONE
@@ -66,7 +65,7 @@ class Terminal:
else:
poll_action = PollAction.DISABLE_KEYBOARD_CLICKER
poll_response = poll(self.interface, poll_action, **kwargs)
poll_response = self.execute(Poll(poll_action))
# Clear the queued alarm and keyboard clicker change if the POLL was
# successful.
@@ -84,17 +83,18 @@ class Terminal:
def load_control_register(self):
"""Execute a LOAD_CONTROL_REGISTER command."""
load_control_register(self.interface, self.control)
self.execute(LoadControlRegister(self.control))
def execute(self, commands):
return self.interface.execute(address_commands(self.device_address, commands))
class UnsupportedTerminalError(Exception):
"""Unsupported terminal."""
def create_terminal(interface, poll_response, get_keymap):
def create_terminal(interface, device_address, poll_response, get_keymap):
"""Terminal factory."""
jumbo_write_strategy = _get_jumbo_write_strategy()
# Read the terminal identifiers.
(terminal_id, extended_id) = _read_terminal_ids(interface)
(terminal_id, extended_id) = _read_terminal_ids(interface, device_address)
logger.info(f'Terminal ID = {terminal_id}, Extended ID = {extended_id}')
@@ -110,35 +110,27 @@ def create_terminal(interface, poll_response, get_keymap):
logger.info(f'Rows = {dimensions.rows}, Columns = {dimensions.columns}')
# Get the terminal features.
features = get_features(interface)
features = _get_features(interface, device_address)
logger.info(f'Features = {features}')
if Feature.EAB in features:
if interface.legacy_firmware_detected and jumbo_write_strategy is None:
del features[Feature.EAB]
_print_no_i1_eab_notice()
# Get the keymap.
keymap = get_keymap(terminal_id, extended_id)
logger.info(f'Keymap = {keymap.name}')
# Create the terminal.
terminal = Terminal(interface, terminal_id, extended_id, dimensions, features,
keymap, jumbo_write_strategy=jumbo_write_strategy)
terminal = Terminal(interface, device_address, terminal_id, extended_id, dimensions,
features, keymap)
return terminal
def _read_terminal_ids(interface, extended_id_retry_attempts=3):
def _read_terminal_ids(interface, device_address, extended_id_retry_attempts=3):
terminal_id = None
extended_id = None
try:
terminal_id = read_terminal_id(interface)
except ReceiveError as error:
logger.warning(f'READ_TERMINAL_ID receive error: {error}', exc_info=error)
terminal_id = interface.execute(address_commands(device_address, ReadTerminalId()))
except ProtocolError as error:
logger.warning(f'READ_TERMINAL_ID protocol error: {error}', exc_info=error)
@@ -148,11 +140,9 @@ def _read_terminal_ids(interface, extended_id_retry_attempts=3):
for attempt in range(extended_id_retry_attempts):
try:
extended_id = read_extended_id(interface)
extended_id = interface.execute(address_commands(device_address, ReadExtendedId()))
break
except ReceiveError as error:
logger.warning(f'READ_EXTENDED_ID receive error: {error}', exc_info=error)
except ProtocolError as error:
logger.warning(f'READ_EXTENDED_ID protocol error: {error}', exc_info=error)
@@ -160,44 +150,9 @@ def _read_terminal_ids(interface, extended_id_retry_attempts=3):
return (terminal_id, extended_id.hex() if extended_id is not None else None)
def _get_jumbo_write_strategy():
value = os.environ.get('COAX_JUMBO')
def _get_features(interface, device_address):
commands = read_feature_ids()
if value is None:
return None
ids = interface.execute([address_commands(device_address, command) for command in commands])
if value in ['split', 'ignore']:
return value
logger.warning(f'Unsupported COAX_JUMBO option: {value}')
return None
def _print_no_i1_eab_notice():
notice = '''
**** **** **** **** **** **** **** **** **** **** **** **** **** **** **** ****
Your terminal is reporting the existence of an EAB feature that allows extended
colors and formatting, however...
I think you are using an older firmware on the 1st generation, Arduino Mega
based, interface which does not support the "jumbo write" required to write a
full screen to the regen and EAB buffers.
I'm going to continue as if the EAB feature did not exist...
If you want to override this behavior, you can set the COAX_JUMBO environment
variable as follows:
- COAX_JUMBO=split - split large writes into multiple smaller 32-byte writes
before sending to the interface, this will result in
additional round trips to the interface which may
manifest as visible incremental changes being applied
to the screen
- COAX_JUMBO=ignore - try a jumbo write, anyway, use this option if you
believe you are seeing this behavior in error
**** **** **** **** **** **** **** **** **** **** **** **** **** **** **** ****
'''
print(dedent(notice))
return parse_features(ids, commands)

View File

@@ -1,6 +1,6 @@
more-itertools==8.7.0
ptyprocess==0.7.0
pycoax==0.9.0
pycoax==0.10.0
pyserial==3.5
pyte==0.8.0
pytn3270==0.10.0

59
tests/mock_interface.py Normal file
View File

@@ -0,0 +1,59 @@
from unittest.mock import Mock
from coax import ProtocolError, ReceiveError, ReceiveTimeout
from coax.interface import Interface
class MockInterface(Interface):
def __init__(self, responses=[]):
self.mock_responses = responses
self.legacy_firmware_detected = None
self.legacy_firmware_version = None
# Wrap the reset and execute methods so calls can be asserted.
self.reset = Mock(wraps=self.reset)
self._execute = Mock(wraps=self._execute)
def _execute(self, commands, timeout):
return [self._mock_get_response(device_address, command) for (device_address, command) in commands]
def reset_mock(self):
self.reset.reset_mock()
self._execute.reset_mock()
def assert_command_executed(self, device_address, command_type, predicate=None):
if not self._mock_get_execute_commands(device_address, command_type, predicate):
raise AssertionError('Expected command to be executed')
def assert_command_not_executed(self, device_address, command_type, predicate=None):
if self._mock_get_execute_commands(device_address, command_type, predicate):
raise AssertionError('Expected command not to be executed')
def _mock_get_execute_commands(self, device_address, command_type, predicate):
calls = self._execute.call_args_list
commands = []
for call in calls:
for command in call[0][0]:
(call_device_address, call_command) = command
if call_device_address == device_address and isinstance(call_command, command_type):
if predicate is None or predicate(call_command):
commands.append(command)
return commands
def _mock_get_response(self, device_address, command):
for (mock_device_address, mock_command_type, mock_predicate, mock_response) in self.mock_responses:
if mock_device_address == device_address and isinstance(command, mock_command_type):
if mock_predicate is None or mock_predicate(command):
if callable(mock_response):
try:
return mock_response()
except (ProtocolError, ReceiveError, ReceiveTimeout) as error:
return error
return mock_response
return None

View File

@@ -1,36 +1,42 @@
import selectors
import unittest
from unittest.mock import Mock, patch
from coax import PollAction, PowerOnResetCompletePollResponse, KeystrokePollResponse, ReceiveTimeout
from unittest.mock import Mock, create_autospec, patch
import selectors
from selectors import BaseSelector
from logging import Logger
from coax import Poll, PollAction, PowerOnResetCompletePollResponse, KeystrokePollResponse, PollAck, ReceiveTimeout
from coax.protocol import TerminalId
import context
from oec.interface import InterfaceWrapper
from oec.controller import Controller
from oec.session import SessionDisconnectedError
from oec.terminal import Terminal, UnsupportedTerminalError
from oec.display import Dimensions
from oec.keyboard import KeyboardModifiers, Key
from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2
from oec.session import Session, SessionDisconnectedError
from mock_interface import MockInterface
class RunLoopTestCase(unittest.TestCase):
def setUp(self):
self.interface = Mock()
self.interface = MockInterface()
self.session_mock = Mock()
self.create_session_mock = Mock(return_value=self.session_mock)
self.session = create_autospec(Session, instance=True)
self.create_session = Mock(return_value=self.session)
self.controller = Controller(self.interface, lambda terminal_id, extended_id: KEYMAP_3278_2, self.create_session_mock)
self.controller = Controller(InterfaceWrapper(self.interface), lambda terminal_id, extended_id: KEYMAP_3278_2, self.create_session)
self.controller.logger = Mock()
self.controller.logger = create_autospec(Logger, instance=True)
self.controller.connected_poll_period = 1
self.controller.session_selector = Mock()
self.controller.session_selector = create_autospec(BaseSelector, instance=True)
self.controller._update_session = Mock()
self.terminal = Terminal(self.interface, TerminalId(0b11110100), 'c1348300', Dimensions(24, 80), { }, KEYMAP_3278_2, None)
self.terminal = Terminal(self.interface, None, TerminalId(0b11110100), 'c1348300', Dimensions(24, 80), { }, KEYMAP_3278_2)
self.terminal.setup = Mock()
@@ -40,33 +46,19 @@ class RunLoopTestCase(unittest.TestCase):
self.terminal.keyboard.toggle_clicker = Mock()
self.poll_mock = Mock()
patcher = patch('oec.controller.poll', self.poll_mock)
patcher.start()
patcher = patch('oec.terminal.poll', self.poll_mock)
patcher.start()
patcher = patch('oec.controller.poll_ack')
self.poll_ack_mock = patcher.start()
patcher = patch('oec.controller.time.perf_counter')
self.perf_counter_mock = patcher.start()
self.perf_counter = patcher.start()
patcher = patch('oec.controller.time.sleep')
self.sleep_mock = patcher.start()
self.sleep = patcher.start()
patcher = patch('oec.controller.create_terminal')
self.create_terminal_mock = patcher.start()
self.create_terminal = patcher.start()
self.create_terminal_mock.return_value = self.terminal
self.create_terminal.return_value = self.terminal
self.addCleanup(patch.stopall)
@@ -88,7 +80,7 @@ class RunLoopTestCase(unittest.TestCase):
self.controller._update_session.assert_called()
def test_unsupported_terminal_attached(self):
self.create_terminal_mock.side_effect = [UnsupportedTerminalError]
self.create_terminal.side_effect = [UnsupportedTerminalError]
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
@@ -114,7 +106,7 @@ class RunLoopTestCase(unittest.TestCase):
self.assertIsNone(self.controller.terminal)
self.assertIsNone(self.controller.session)
self.session_mock.terminate.assert_called()
self.session.terminate.assert_called()
def test_session_disconnected(self):
self.controller._update_session.side_effect = [None, SessionDisconnectedError, None]
@@ -127,7 +119,7 @@ class RunLoopTestCase(unittest.TestCase):
self.assertIsNotNone(self.controller.terminal)
self.assertIsNotNone(self.controller.session)
self.assertEqual(self.create_session_mock.call_count, 2)
self.assertEqual(self.create_session.call_count, 2)
def test_toggle_cursor_blink(self):
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
@@ -178,13 +170,13 @@ class RunLoopTestCase(unittest.TestCase):
# Arrange
self.controller._update_session.reset_mock()
self.poll_mock.side_effect = [poll_response]
self.interface.mock_responses = [(None, Poll, None, poll_response)]
self.poll_ack_mock.reset_mock()
self.interface.reset_mock()
self.perf_counter_mock.side_effect = [poll_time, poll_time + expected_poll_delay]
self.perf_counter.side_effect = [poll_time, poll_time + expected_poll_delay]
self.sleep_mock.reset_mock()
self.sleep.reset_mock()
# Act
self.controller._run_loop()
@@ -192,31 +184,31 @@ class RunLoopTestCase(unittest.TestCase):
# Assert
if expected_update_session:
self.controller._update_session.assert_called_once_with(expected_poll_delay)
self.sleep_mock.assert_not_called()
self.sleep.assert_not_called()
else:
self.controller._update_session.assert_not_called()
if expected_poll_delay > 0:
self.sleep_mock.assert_called_once_with(expected_poll_delay)
self.sleep.assert_called_once_with(expected_poll_delay)
else:
self.sleep_mock.assert_not_called()
self.sleep.assert_not_called()
if expected_poll_ack:
self.poll_ack_mock.assert_called_once()
self.interface.assert_command_executed(None, PollAck)
else:
self.poll_ack_mock.assert_not_called()
self.interface.assert_command_not_executed(None, PollAck)
class UpdateSessionTestCase(unittest.TestCase):
def setUp(self):
self.controller = Controller(None, None, None)
self.controller.session = Mock()
self.controller.session = create_autospec(Session, instance=True)
self.controller.session_selector = Mock()
self.controller.session_selector = create_autospec(BaseSelector, instance=True)
patcher = patch('oec.controller.time.perf_counter')
self.perf_counter_mock = patcher.start()
self.perf_counter = patcher.start()
def test_zero_duration(self):
# Act
@@ -243,7 +235,7 @@ class UpdateSessionTestCase(unittest.TestCase):
def test_select_available(self):
# Arrange
self.perf_counter_mock.side_effect = [0, 0.75, 0.75]
self.perf_counter.side_effect = [0, 0.75, 0.75]
selector_key = Mock(fileobj=self.controller.session)

View File

@@ -1,47 +1,27 @@
import unittest
from unittest.mock import Mock, patch
from unittest.mock import Mock, create_autospec
from coax import ReadAddressCounterHi, ReadAddressCounterLo, LoadAddressCounterHi, LoadAddressCounterLo, Feature
from coax.protocol import TerminalId
import context
from oec.display import Dimensions, Display, StatusLine, BufferedDisplay, encode_ascii_character, encode_ebcdic_character, encode_string
from oec.interface import InterfaceWrapper
from oec.terminal import Terminal
from oec.display import Display, Dimensions, StatusLine, BufferedDisplay, encode_ascii_character, encode_ebcdic_character, encode_string
from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2
from mock_interface import MockInterface
class DisplayClearTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.interface = MockInterface()
dimensions = Dimensions(24, 80)
self.display = Display(self.terminal, dimensions, None)
self.display = _create_display(self.interface)
self.display.write = Mock(wraps=self.display.write)
self.display._load_address_counter = Mock(wraps=self.display._load_address_counter)
patcher = patch('oec.display.read_address_counter_hi')
self.read_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.read_address_counter_lo')
self.read_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_hi')
self.load_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_lo')
self.load_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.write_data')
self.write_data_mock = patcher.start()
patcher = patch('oec.display.eab_write_alternate')
self.eab_write_alternate_mock = patcher.start()
self.addCleanup(patch.stopall)
def test_excluding_status_line_with_no_eab_feature(self):
# Act
self.display.clear(clear_status_line=False)
@@ -82,24 +62,12 @@ class DisplayClearTestCase(unittest.TestCase):
class DisplayMoveCursorTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.interface = MockInterface()
dimensions = Dimensions(24, 80)
self.display = Display(self.terminal, dimensions, None)
self.display = _create_display(self.interface)
self.display._load_address_counter = Mock(wraps=self.display._load_address_counter)
patcher = patch('oec.display.load_address_counter_hi')
self.load_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_lo')
self.load_address_counter_lo_mock = patcher.start()
self.addCleanup(patch.stopall)
def test_with_address(self):
# Act
self.display.move_cursor(address=895)
@@ -138,40 +106,14 @@ class DisplayMoveCursorTestCase(unittest.TestCase):
class DisplayWriteTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.interface = MockInterface()
dimensions = Dimensions(24, 80)
self.display = Display(self.terminal, dimensions, None)
self.display = _create_display(self.interface)
self.display._read_address_counter = Mock(wraps=self.display._read_address_counter)
self.display._load_address_counter = Mock(wraps=self.display._load_address_counter)
patcher = patch('oec.display.read_address_counter_hi')
self.read_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.read_address_counter_lo')
self.read_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_hi')
self.load_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_lo')
self.load_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.write_data')
self.write_data_mock = patcher.start()
patcher = patch('oec.display.eab_write_alternate')
self.eab_write_alternate_mock = patcher.start()
self.addCleanup(patch.stopall)
self.display._write_data = Mock(wraps=self.display._write_data)
self.display._eab_write_alternate = Mock(wraps=self.display._eab_write_alternate)
def test_no_eab_feature(self):
# Act and assert
@@ -266,8 +208,10 @@ class DisplayWriteTestCase(unittest.TestCase):
# Arrange
self.assertIsNone(self.display.address_counter)
self.read_address_counter_hi_mock.return_value = 0
self.read_address_counter_lo_mock.return_value = 160
self.interface.mock_responses = [
(None, ReadAddressCounterHi, None, 0),
(None, ReadAddressCounterLo, None, 160)
]
# Act
self.display.write(bytes.fromhex('01 02 03'), None, restore_original_address=True)
@@ -299,7 +243,7 @@ class DisplayWriteTestCase(unittest.TestCase):
self.display.write(bytes.fromhex('01 02 03'), None)
# Assert
self.write_data_mock.assert_called_with(self.terminal.interface, bytes.fromhex('01 02 03'), jumbo_write_strategy=None)
self.display._write_data.assert_called_with(bytes.fromhex('01 02 03'))
def test_regen_only_repeat(self):
# Arrange
@@ -309,7 +253,7 @@ class DisplayWriteTestCase(unittest.TestCase):
self.display.write((bytes.fromhex('01 02 03'), 2), None)
# Assert
self.write_data_mock.assert_called_with(self.terminal.interface, (bytes.fromhex('01 02 03'), 2), jumbo_write_strategy=None)
self.display._write_data.assert_called_with((bytes.fromhex('01 02 03'), 2))
def test_regen_eab(self):
# Arrange
@@ -320,7 +264,7 @@ class DisplayWriteTestCase(unittest.TestCase):
self.display.write(bytes.fromhex('01 02 03'), bytes.fromhex('04 05 06'))
# Assert
self.eab_write_alternate_mock.assert_called_with(self.terminal.interface, 7, bytes.fromhex('01 04 02 05 03 06'), jumbo_write_strategy=None)
self.display._eab_write_alternate.assert_called_with(bytes.fromhex('01 04 02 05 03 06'))
def test_regen_eab_repeat(self):
# Arrange
@@ -331,25 +275,13 @@ class DisplayWriteTestCase(unittest.TestCase):
self.display.write((bytes.fromhex('01 02 03'), 2), (bytes.fromhex('04 05 06'), 2))
# Assert
self.eab_write_alternate_mock.assert_called_with(self.terminal.interface, 7, (bytes.fromhex('01 04 02 05 03 06'), 2), jumbo_write_strategy=None)
self.display._eab_write_alternate.assert_called_with((bytes.fromhex('01 04 02 05 03 06'), 2))
class DisplayLoadAddressCounterTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.interface = MockInterface()
dimensions = Dimensions(24, 80)
self.display = Display(self.terminal, dimensions, None)
patcher = patch('oec.display.load_address_counter_hi')
self.load_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_lo')
self.load_address_counter_lo_mock = patcher.start()
self.addCleanup(patch.stopall)
self.display = _create_display(self.interface)
def test(self):
# Act
@@ -358,15 +290,14 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase):
# Assert
self.assertEqual(self.display.address_counter, 895)
self.load_address_counter_hi_mock.assert_called_with(self.terminal.interface, 3)
self.load_address_counter_lo_mock.assert_called_with(self.terminal.interface, 127)
self.assert_load_address_counter_hi(3)
self.assert_load_address_counter_lo(127)
def test_hi_change(self):
# Arrange
self.display._load_address_counter(895, force_load=False)
self.load_address_counter_hi_mock.reset_mock()
self.load_address_counter_lo_mock.reset_mock()
self.interface.reset_mock()
# Act
self.display._load_address_counter(1151, force_load=False)
@@ -374,15 +305,14 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase):
# Assert
self.assertEqual(self.display.address_counter, 1151)
self.load_address_counter_hi_mock.assert_called_with(self.terminal.interface, 4)
self.load_address_counter_lo_mock.assert_not_called()
self.assert_load_address_counter_hi(4)
self.interface.assert_command_not_executed(None, LoadAddressCounterLo)
def test_lo_change(self):
# Arrange
self.display._load_address_counter(895, force_load=False)
self.load_address_counter_hi_mock.reset_mock()
self.load_address_counter_lo_mock.reset_mock()
self.interface.reset_mock()
# Act
self.display._load_address_counter(896, force_load=False)
@@ -390,15 +320,14 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase):
# Assert
self.assertEqual(self.display.address_counter, 896)
self.load_address_counter_hi_mock.assert_not_called()
self.load_address_counter_lo_mock.assert_called_with(self.terminal.interface, 128)
self.interface.assert_command_not_executed(None, LoadAddressCounterHi)
self.assert_load_address_counter_lo(128)
def test_hi_lo_change(self):
# Arrange
self.display._load_address_counter(895, force_load=False)
self.load_address_counter_hi_mock.reset_mock()
self.load_address_counter_lo_mock.reset_mock()
self.interface.reset_mock()
# Act
self.display._load_address_counter(1152, force_load=False)
@@ -406,15 +335,14 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase):
# Assert
self.assertEqual(self.display.address_counter, 1152)
self.load_address_counter_hi_mock.assert_called_with(self.terminal.interface, 4)
self.load_address_counter_lo_mock.assert_called_with(self.terminal.interface, 128)
self.assert_load_address_counter_hi(4)
self.assert_load_address_counter_lo(128)
def test_no_change(self):
# Arrange
self.display._load_address_counter(80, force_load=False)
self.load_address_counter_hi_mock.reset_mock()
self.load_address_counter_lo_mock.reset_mock()
self.interface.reset_mock()
# Act
self.display._load_address_counter(80, force_load=False)
@@ -422,15 +350,14 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase):
# Assert
self.assertEqual(self.display.address_counter, 80)
self.load_address_counter_hi_mock.assert_not_called()
self.load_address_counter_lo_mock.assert_not_called()
self.interface.assert_command_not_executed(None, LoadAddressCounterHi)
self.interface.assert_command_not_executed(None, LoadAddressCounterLo)
def test_no_change_force(self):
# Arrange
self.display._load_address_counter(80, force_load=False)
self.load_address_counter_hi_mock.reset_mock()
self.load_address_counter_lo_mock.reset_mock()
self.interface.reset_mock()
# Act
self.display._load_address_counter(80, force_load=True)
@@ -438,12 +365,18 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase):
# Assert
self.assertEqual(self.display.address_counter, 80)
self.load_address_counter_hi_mock.assert_called_with(self.terminal.interface, 0)
self.load_address_counter_lo_mock.assert_called_with(self.terminal.interface, 80)
self.assert_load_address_counter_hi(0)
self.assert_load_address_counter_lo(80)
def assert_load_address_counter_hi(self, address):
self.interface.assert_command_executed(None, LoadAddressCounterHi, lambda command: command.address == address)
def assert_load_address_counter_lo(self, address):
self.interface.assert_command_executed(None, LoadAddressCounterLo, lambda command: command.address == address)
class StatusLineWriteTestCase(unittest.TestCase):
def setUp(self):
self.display = Mock()
self.display = create_autospec(Display, instance=True)
self.display.dimensions = Dimensions(24, 80)
@@ -462,7 +395,7 @@ class StatusLineWriteTestCase(unittest.TestCase):
class BufferedDisplayBufferedWriteByteTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.terminal = create_autospec(Terminal, instance=True)
dimensions = Dimensions(24, 80)
@@ -561,40 +494,12 @@ class BufferedDisplayBufferedWriteByteTestCase(unittest.TestCase):
class BufferedDisplayFlushTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.interface = MockInterface()
dimensions = Dimensions(24, 80)
self.buffered_display = BufferedDisplay(self.terminal, dimensions, None)
self.buffered_display = _create_buffered_display(self.interface)
self.buffered_display.write = Mock(wraps=self.buffered_display.write)
patcher = patch('oec.display.read_address_counter_hi')
self.read_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.read_address_counter_lo')
self.read_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_hi')
self.load_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_lo')
self.load_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.write_data')
self.write_data_mock = patcher.start()
patcher = patch('oec.display.eab_write_alternate')
self.eab_write_alternate_mock = patcher.start()
self.addCleanup(patch.stopall)
def test_no_changes(self):
# Arrange
self.assertFalse(self.buffered_display.dirty)
@@ -624,9 +529,7 @@ class BufferedDisplayFlushTestCase(unittest.TestCase):
def test_single_range_with_eab_feature(self):
# Arrange
dimensions = Dimensions(24, 80)
self.buffered_display = BufferedDisplay(self.terminal, dimensions, 7)
self.buffered_display = _create_buffered_display(self.interface, has_eab=True)
self.buffered_display.write = Mock(wraps=self.buffered_display.write)
@@ -669,9 +572,7 @@ class BufferedDisplayFlushTestCase(unittest.TestCase):
def test_multiple_ranges_with_eab_feature(self):
# Arrange
dimensions = Dimensions(24, 80)
self.buffered_display = BufferedDisplay(self.terminal, dimensions, 7)
self.buffered_display = _create_buffered_display(self.interface, has_eab=True)
self.buffered_display.write = Mock(wraps=self.buffered_display.write)
@@ -700,41 +601,13 @@ class BufferedDisplayFlushTestCase(unittest.TestCase):
class BufferedDisplayClearTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.interface = MockInterface()
dimensions = Dimensions(24, 80)
self.buffered_display = BufferedDisplay(self.terminal, dimensions, None)
self.buffered_display = _create_buffered_display(self.interface)
self.buffered_display.write = Mock(wraps=self.buffered_display.write)
self.buffered_display._load_address_counter = Mock(wraps=self.buffered_display._load_address_counter)
patcher = patch('oec.display.read_address_counter_hi')
self.read_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.read_address_counter_lo')
self.read_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_hi')
self.load_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_lo')
self.load_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.write_data')
self.write_data_mock = patcher.start()
patcher = patch('oec.display.eab_write_alternate')
self.eab_write_alternate_mock = patcher.start()
self.addCleanup(patch.stopall)
def test_excluding_status_line_with_no_eab_feature(self):
# Arrange
self.buffered_display.buffered_write_byte(0x01, None, address=80)
@@ -753,9 +626,7 @@ class BufferedDisplayClearTestCase(unittest.TestCase):
def test_excluding_status_line_with_eab_feature(self):
# Arrange
dimensions = Dimensions(24, 80)
self.buffered_display = BufferedDisplay(self.terminal, dimensions, 7)
self.buffered_display = _create_buffered_display(self.interface, has_eab=True)
self.buffered_display.write = Mock(wraps=self.buffered_display.write)
self.buffered_display._load_address_counter = Mock(wraps=self.buffered_display._load_address_counter)
@@ -793,9 +664,7 @@ class BufferedDisplayClearTestCase(unittest.TestCase):
def test_including_status_line_with_eab_feature(self):
# Arrange
dimensions = Dimensions(24, 80)
self.buffered_display = BufferedDisplay(self.terminal, dimensions, 7)
self.buffered_display = _create_buffered_display(self.interface, has_eab=True)
self.buffered_display.write = Mock(wraps=self.buffered_display.write)
self.buffered_display._load_address_counter = Mock(wraps=self.buffered_display._load_address_counter)
@@ -817,47 +686,23 @@ class BufferedDisplayClearTestCase(unittest.TestCase):
class BufferedDisplayWriteTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.interface = MockInterface()
dimensions = Dimensions(24, 80)
self.buffered_display = BufferedDisplay(self.terminal, dimensions, None)
self.buffered_display = _create_buffered_display(self.interface)
self.buffered_display._read_address_counter = Mock(wraps=self.buffered_display._read_address_counter)
self.buffered_display._load_address_counter = Mock(wraps=self.buffered_display._load_address_counter)
patcher = patch('oec.display.read_address_counter_hi')
self.read_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.read_address_counter_lo')
self.read_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_hi')
self.load_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_lo')
self.load_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.write_data')
self.write_data_mock = patcher.start()
patcher = patch('oec.display.eab_write_alternate')
self.eab_write_alternate_mock = patcher.start()
self.addCleanup(patch.stopall)
self.buffered_display._write_data = Mock(wraps=self.buffered_display._write_data)
self.buffered_display._eab_write_alternate = Mock(wraps=self.buffered_display._eab_write_alternate)
def test_if_current_address_unknown(self):
# Arrange
self.assertIsNone(self.buffered_display.address_counter)
self.read_address_counter_hi_mock.return_value = 0
self.read_address_counter_lo_mock.return_value = 160
self.interface.mock_responses = [
(None, ReadAddressCounterHi, None, 0),
(None, ReadAddressCounterLo, None, 160)
]
# Act
self.buffered_display.write(bytes.fromhex('01 02 03'), None)
@@ -895,7 +740,7 @@ class BufferedDisplayWriteTestCase(unittest.TestCase):
# Assert
self.assertEqual(self.buffered_display.regen_buffer[80:83], bytes.fromhex('01 02 03'))
self.write_data_mock.assert_called_with(self.terminal.interface, bytes.fromhex('01 02 03'), jumbo_write_strategy=None)
self.buffered_display._write_data.assert_called_with(bytes.fromhex('01 02 03'))
def test_regen_only_repeat(self):
# Arrange
@@ -907,16 +752,16 @@ class BufferedDisplayWriteTestCase(unittest.TestCase):
# Assert
self.assertEqual(self.buffered_display.regen_buffer[80:86], bytes.fromhex('01 02 03 01 02 03'))
self.write_data_mock.assert_called_with(self.terminal.interface, (bytes.fromhex('01 02 03'), 2), jumbo_write_strategy=None)
self.buffered_display._write_data.assert_called_with((bytes.fromhex('01 02 03'), 2))
def test_regen_eab(self):
# Arrange
dimensions = Dimensions(24, 80)
self.buffered_display = BufferedDisplay(self.terminal, dimensions, 7)
self.buffered_display = _create_buffered_display(self.interface, has_eab=True)
self.buffered_display._read_address_counter = Mock(wraps=self.buffered_display._read_address_counter)
self.buffered_display._load_address_counter = Mock(wraps=self.buffered_display._load_address_counter)
self.buffered_display._write_data = Mock(wraps=self.buffered_display._write_data)
self.buffered_display._eab_write_alternate = Mock(wraps=self.buffered_display._eab_write_alternate)
self.buffered_display.address_counter = 80
@@ -927,16 +772,16 @@ class BufferedDisplayWriteTestCase(unittest.TestCase):
self.assertEqual(self.buffered_display.regen_buffer[80:83], bytes.fromhex('01 02 03'))
self.assertEqual(self.buffered_display.eab_buffer[80:83], bytes.fromhex('04 05 06'))
self.eab_write_alternate_mock.assert_called_with(self.terminal.interface, 7, bytes.fromhex('01 04 02 05 03 06'), jumbo_write_strategy=None)
self.buffered_display._eab_write_alternate.assert_called_with(bytes.fromhex('01 04 02 05 03 06'))
def test_regen_eab_repeat(self):
# Arrange
dimensions = Dimensions(24, 80)
self.buffered_display = BufferedDisplay(self.terminal, dimensions, 7)
self.buffered_display = _create_buffered_display(self.interface, has_eab=True)
self.buffered_display._read_address_counter = Mock(wraps=self.buffered_display._read_address_counter)
self.buffered_display._load_address_counter = Mock(wraps=self.buffered_display._load_address_counter)
self.buffered_display._write_data = Mock(wraps=self.buffered_display._write_data)
self.buffered_display._eab_write_alternate = Mock(wraps=self.buffered_display._eab_write_alternate)
self.buffered_display.address_counter = 80
@@ -947,7 +792,7 @@ class BufferedDisplayWriteTestCase(unittest.TestCase):
self.assertEqual(self.buffered_display.regen_buffer[80:86], bytes.fromhex('01 02 03 01 02 03'))
self.assertEqual(self.buffered_display.eab_buffer[80:86], bytes.fromhex('04 05 06 04 05 06'))
self.eab_write_alternate_mock.assert_called_with(self.terminal.interface, 7, (bytes.fromhex('01 04 02 05 03 06'), 2), jumbo_write_strategy=None)
self.buffered_display._eab_write_alternate.assert_called_with((bytes.fromhex('01 04 02 05 03 06'), 2))
def test_dirty_cleared(self):
# Arrange
@@ -993,3 +838,29 @@ class EncodeStringTestCase(unittest.TestCase):
def test_unmapped_characters(self):
self.assertEqual(encode_string('Everything ✓'), bytes.fromhex('a4 95 84 91 98 93 87 88 8d 86 00 18'))
def _create_display(interface):
terminal_id = TerminalId(0b11110100)
extended_id = 'c1348300'
dimensions = Dimensions(24, 80)
features = { }
keymap = KEYMAP_3278_2
terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap)
display = Display(terminal, dimensions, features.get(Feature.EAB))
return display
def _create_buffered_display(interface, has_eab=False):
terminal_id = TerminalId(0b11110100)
extended_id = 'c1348300'
dimensions = Dimensions(24, 80)
features = { Feature.EAB: 7 } if has_eab else { }
keymap = KEYMAP_3278_2
terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap)
buffered_display = BufferedDisplay(terminal, dimensions, features.get(Feature.EAB))
return buffered_display

202
tests/test_interface.py Normal file
View File

@@ -0,0 +1,202 @@
import unittest
from unittest.mock import Mock, patch
from coax import ReadAddressCounterHi, ReadAddressCounterLo, ProtocolError
import context
from oec.interface import InterfaceWrapper, AggregateExecuteError, address_commands
from mock_interface import MockInterface
class InterfaceWrapperInitTestCase(unittest.TestCase):
def setUp(self):
self.interface = MockInterface()
patcher = patch('oec.interface._get_jumbo_write_strategy')
self.get_jumbo_write_strategy = patcher.start()
patcher = patch('oec.interface._print_i1_jumbo_write_notice')
self.print_i1_jumbo_write_notice = patcher.start()
self.addCleanup(patch.stopall)
def test_no_jumbo_write_strategy(self):
# Arrange
self.get_jumbo_write_strategy.return_value = None
# Act
interface_wrapper = InterfaceWrapper(self.interface)
# Assert
self.assertIsNone(interface_wrapper.jumbo_write_strategy)
self.assertIsNone(interface_wrapper.jumbo_write_max_length)
self.print_i1_jumbo_write_notice.assert_not_called()
def test_split_jumbo_write_strategy(self):
# Arrange
self.get_jumbo_write_strategy.return_value = 'split'
# Act
interface_wrapper = InterfaceWrapper(self.interface)
# Assert
self.assertEqual(interface_wrapper.jumbo_write_strategy, 'split')
self.assertEqual(interface_wrapper.jumbo_write_max_length, 1024)
self.print_i1_jumbo_write_notice.assert_not_called()
def test_i1_no_jumbo_write_strategy(self):
# Arrange
self.interface.legacy_firmware_detected = True
self.get_jumbo_write_strategy.return_value = None
# Act
interface_wrapper = InterfaceWrapper(self.interface)
# Assert
self.assertEqual(interface_wrapper.jumbo_write_strategy, 'split')
self.assertEqual(interface_wrapper.jumbo_write_max_length, 1024)
self.print_i1_jumbo_write_notice.assert_called()
class InterfaceWrapperExecuteTestCase(unittest.TestCase):
def setUp(self):
self.interface = MockInterface()
self.interface_wrapper = InterfaceWrapper(self.interface)
def test_single_command(self):
# Arrange
self.interface.mock_responses = [(None, ReadAddressCounterHi, None, 0x00)]
# Act
response = self.interface_wrapper.execute((None, ReadAddressCounterHi()))
# Assert
self.assertEqual(response, 0x00)
def test_single_command_that_raises_error(self):
# Arrange
self.interface.mock_responses = [(None, ReadAddressCounterHi, None, Mock(side_effect=ProtocolError))]
# Act and assert
with self.assertRaises(ProtocolError):
self.interface_wrapper.execute((None, ReadAddressCounterHi()))
def test_multiple_commands(self):
# Arrange
self.interface.mock_responses = [
(None, ReadAddressCounterHi, None, 0x00),
(None, ReadAddressCounterLo, None, 0xff)
]
# Act
responses = self.interface_wrapper.execute([(None, ReadAddressCounterHi()), (None, ReadAddressCounterLo())])
# Assert
self.assertEqual(responses, [0x00, 0xff])
def test_multiple_commands_that_returns_error(self):
# Arrange
self.interface.mock_responses = [
(None, ReadAddressCounterHi, None, 0x00),
(None, ReadAddressCounterLo, None, Mock(side_effect=ProtocolError))
]
# Act and assert
with self.assertRaises(AggregateExecuteError) as context:
self.interface_wrapper.execute([(None, ReadAddressCounterHi()), (None, ReadAddressCounterLo())])
error = context.exception
self.assertEqual(len(error.errors), 1)
self.assertIsInstance(error.errors[0], ProtocolError)
self.assertEqual(len(error.responses), 2)
class InterfaceWrapperJumboWriteSplitDataTestCase(unittest.TestCase):
def setUp(self):
self.interface = MockInterface()
self.interface_wrapper = InterfaceWrapper(self.interface)
def test_no_split_strategy(self):
# Arrange
self.interface_wrapper.jumbo_write_strategy = None
self.interface_wrapper.jumbo_write_max_length = 32
# Act and assert
for data in [bytes(range(0, 64)), (bytes.fromhex('00'), 64)]:
with self.subTest(data=data):
result = self.interface_wrapper.jumbo_write_split_data(data)
self.assertEqual(len(result), 1)
self.assertEqual(result[0], data)
def test_split_strategy_one_chunk(self):
# Arrange
self.interface_wrapper.jumbo_write_strategy = 'split'
self.interface_wrapper.jumbo_write_max_length = 32
# Act and assert
for data in [bytes(range(0, 16)), (bytes.fromhex('00'), 16), bytes(range(0, 31)), (bytes.fromhex('00'), 31)]:
with self.subTest(data=data):
result = self.interface_wrapper.jumbo_write_split_data(data)
self.assertEqual(len(result), 1)
self.assertEqual(result[0], data)
def test_split_strategy_two_chunks(self):
# Arrange
self.interface_wrapper.jumbo_write_strategy = 'split'
self.interface_wrapper.jumbo_write_max_length = 32
# Act and assert
for data in [bytes(range(0, 32)), (bytes.fromhex('00'), 32), bytes(range(0, 63)), (bytes.fromhex('00'), 63)]:
with self.subTest(data=data):
result = self.interface_wrapper.jumbo_write_split_data(data)
self.assertEqual(len(result), 2)
self.assertEqual(len(result[0]), 31)
def test_split_strategy_three_chunks(self):
# Arrange
self.interface_wrapper.jumbo_write_strategy = 'split'
self.interface_wrapper.jumbo_write_max_length = 32
# Act and assert
for data in [bytes(range(0, 64)), (bytes.fromhex('00'), 64), bytes(range(0, 95)), (bytes.fromhex('00'), 95)]:
with self.subTest(data=data):
result = self.interface_wrapper.jumbo_write_split_data(data)
self.assertEqual(len(result), 3)
self.assertEqual(len(result[0]), 31)
self.assertEqual(len(result[1]), 32)
class AddressCommandsTestCase(unittest.TestCase):
def test_single_command(self):
# Arrange
command = ReadAddressCounterHi()
# Act
result = address_commands(0b111000, command)
# Assert
self.assertEqual(result, (0b111000, command))
def test_multiple_commands(self):
# Arrange
commands = [ReadAddressCounterHi(), ReadAddressCounterLo()]
# Act
result = address_commands(0b111000, commands)
# Assert
self.assertEqual(result, [(0b111000, commands[0]), (0b111000, commands[1])])

View File

@@ -1,68 +1,49 @@
import unittest
from unittest.mock import Mock, patch
from coax import Feature, PollAction
from coax.protocol import TerminalId, TerminalType
from unittest.mock import Mock, create_autospec
from coax import Poll, PollAction, TerminalType, Feature, ReadTerminalId, ReadExtendedId, ReadFeatureId
from coax.protocol import TerminalId
import context
from oec.interface import InterfaceWrapper
from oec.terminal import create_terminal, Terminal, UnsupportedTerminalError
from oec.display import Dimensions
from oec.display import Display, Dimensions
from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2
from mock_interface import MockInterface
class TerminalSetupTestCase(unittest.TestCase):
def setUp(self):
self.interface = Mock()
self.interface = MockInterface()
terminal_id = TerminalId(0b11110100)
extended_id = 'c1348300'
dimensions = Dimensions(24, 80)
features = { }
keymap = KEYMAP_3278_2
self.terminal = _create_terminal(self.interface)
self.terminal = Terminal(self.interface, terminal_id, extended_id, dimensions, features, keymap)
self.terminal.display = Mock()
patcher = patch('oec.terminal.load_control_register')
self.load_control_register_mock = patcher.start()
self.addCleanup(patch.stopall)
self.terminal.display = create_autospec(Display, instance=True)
def test(self):
self.terminal.setup()
class TerminalPollTestCase(unittest.TestCase):
def setUp(self):
self.interface = Mock()
self.interface = MockInterface()
terminal_id = TerminalId(0b11110100)
extended_id = 'c1348300'
dimensions = Dimensions(24, 80)
features = { }
keymap = KEYMAP_3278_2
self.terminal = _create_terminal(self.interface)
self.terminal = Terminal(self.interface, terminal_id, extended_id, dimensions, features, keymap)
patcher = patch('oec.terminal.poll')
self.poll_mock = patcher.start()
self.addCleanup(patch.stopall)
self.terminal.display = create_autospec(Display, instance=True)
# The terminal will be initialized in a state where the terminal keyboard clicker
# state is unknown, and this cannot be read. Therefore the first POLL will always
# attempt to set the keyboard clicker state...
self.terminal.poll()
self.poll_mock.reset_mock()
self.interface.reset_mock()
def test_with_no_queued_actions(self):
# Act
self.terminal.poll()
# Assert
self.poll_mock.assert_called_with(self.interface, PollAction.NONE)
self.assert_poll_with_poll_action(PollAction.NONE)
def test_with_sound_alarm_queued(self):
# Arrange
@@ -72,7 +53,7 @@ class TerminalPollTestCase(unittest.TestCase):
self.terminal.poll()
# Assert
self.poll_mock.assert_called_with(self.interface, PollAction.ALARM)
self.assert_poll_with_poll_action(PollAction.ALARM)
def test_with_enable_keyboard_clicker_queued(self):
# Arrange
@@ -84,38 +65,29 @@ class TerminalPollTestCase(unittest.TestCase):
self.terminal.poll()
# Assert
self.poll_mock.assert_called_with(self.interface, PollAction.ENABLE_KEYBOARD_CLICKER)
self.assert_poll_with_poll_action(PollAction.ENABLE_KEYBOARD_CLICKER)
def assert_poll_with_poll_action(self, action):
self.interface.assert_command_executed(None, Poll, lambda command: command.action == action)
class CreateTerminalTestCase(unittest.TestCase):
def setUp(self):
self.interface = Mock()
self.interface.legacy_firmware_detected = False
self.interface = MockInterface()
self.get_keymap = lambda terminal_id, extended_id: KEYMAP_3278_2
patcher = patch('oec.terminal.read_terminal_id')
self.read_terminal_id_mock = patcher.start()
patcher = patch('oec.terminal.read_extended_id')
self.read_extended_id_mock = patcher.start()
patcher = patch('oec.terminal.get_features')
self.get_features_mock = patcher.start()
self.addCleanup(patch.stopall)
def test_supported_terminal(self):
# Arrange
self.read_terminal_id_mock.return_value = TerminalId(0b11110100)
self.read_extended_id_mock.return_value = bytes.fromhex('c1 34 83 00')
self.get_features_mock.return_value = { Feature.EAB: 7 }
interface = InterfaceWrapper(self.interface)
self.interface.mock_responses = [
(None, ReadTerminalId, None, TerminalId(0b11110100)),
(None, ReadExtendedId, None, bytes.fromhex('c1 34 83 00')),
(None, ReadFeatureId, lambda command: command.feature_address == 7, Feature.EAB.value)
]
# Act
terminal = create_terminal(self.interface, None, self.get_keymap)
terminal = create_terminal(interface, None, None, self.get_keymap)
# Assert
self.assertEqual(terminal.terminal_id.type, TerminalType.CUT)
@@ -128,46 +100,34 @@ class CreateTerminalTestCase(unittest.TestCase):
def test_unsupported_terminal_type(self):
# Arrange
self.read_terminal_id_mock.return_value = TerminalId(0b00000001)
interface = InterfaceWrapper(self.interface)
self.interface.mock_responses = [(None, ReadTerminalId, None, TerminalId(0b00000001))]
# Act and assert
with self.assertRaises(UnsupportedTerminalError):
create_terminal(self.interface, None, self.get_keymap)
create_terminal(interface, None, None, self.get_keymap)
def test_unsupported_terminal_model(self):
# Arrange
interface = InterfaceWrapper(self.interface)
terminal_id = TerminalId(0b11110100)
terminal_id.model = 1
self.read_terminal_id_mock.return_value = terminal_id
self.interface.mock_responses = [(None, ReadTerminalId, None, terminal_id)]
# Act and assert
with self.assertRaises(UnsupportedTerminalError):
create_terminal(self.interface, None, self.get_keymap)
create_terminal(interface, None, None, self.get_keymap)
def test_eab_feature_removed_on_legacy_interface_without_strategy(self):
# Arrange
self.interface.legacy_firmware_detected = True
def _create_terminal(interface):
terminal_id = TerminalId(0b11110100)
extended_id = 'c1348300'
dimensions = Dimensions(24, 80)
features = { }
keymap = KEYMAP_3278_2
self.read_terminal_id_mock.return_value = TerminalId(0b11110100)
self.read_extended_id_mock.return_value = bytes.fromhex('c1 34 83 00')
self.get_features_mock.return_value = { Feature.EAB: 7 }
terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap)
patcher = patch('oec.terminal._print_no_i1_eab_notice')
print_no_i1_eab_notice_mock = patcher.start()
# Act
terminal = create_terminal(self.interface, None, self.get_keymap)
# Assert
self.assertEqual(terminal.terminal_id.type, TerminalType.CUT)
self.assertEqual(terminal.terminal_id.model, 2)
self.assertEqual(terminal.terminal_id.keyboard, 15)
self.assertEqual(terminal.extended_id, 'c1348300')
self.assertEqual(terminal.display.dimensions, Dimensions(24, 80))
self.assertEqual(terminal.features, { })
self.assertEqual(terminal.keyboard.keymap.name, '3278-2')
print_no_i1_eab_notice_mock.assert_called_once()
return terminal

View File

@@ -1,26 +1,35 @@
import unittest
from unittest.mock import Mock, patch
from unittest.mock import Mock, create_autospec
import context
from oec.session import SessionDisconnectedError
from oec.display import Dimensions, BufferedDisplay
from oec.keyboard import Key, KeyboardModifiers
from oec.tn3270 import TN3270Session
from tn3270 import AttributeCell, CharacterCell, AID, Color, ProtectedCellOperatorError, FieldOverflowOperatorError
from coax.protocol import TerminalId
from tn3270 import Telnet, Emulator, AttributeCell, CharacterCell, AID, Color, ProtectedCellOperatorError, FieldOverflowOperatorError
from tn3270.attributes import Attribute
from tn3270.emulator import CellFormatting
import context
from oec.interface import InterfaceWrapper
from oec.terminal import Terminal
from oec.display import Dimensions, BufferedDisplay, StatusLine
from oec.keyboard import Key, KeyboardModifiers
from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2
from oec.session import SessionDisconnectedError
from oec.tn3270 import TN3270Session
from mock_interface import MockInterface
class SessionHandleHostTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.interface = MockInterface()
self.terminal = _create_terminal(self.interface)
self.session = TN3270Session(self.terminal, 'mainframe', 23)
self.telnet = Mock()
self.telnet = create_autospec(Telnet, instance=True)
self.session.telnet = self.telnet
self.session.emulator = Mock()
self.session.emulator = create_autospec(Emulator, instance=True)
def test_no_changes(self):
# Arrange
@@ -58,11 +67,13 @@ class SessionHandleHostTestCase(unittest.TestCase):
class SessionHandleKeyTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.interface = MockInterface()
self.terminal = _create_terminal(self.interface)
self.session = TN3270Session(self.terminal, 'mainframe', 23)
self.session.emulator = Mock()
self.session.emulator = create_autospec(Emulator, instance=True)
self.session.emulator.cells = []
self.session.emulator.dirty = set()
@@ -206,9 +217,9 @@ class SessionHandleKeyTestCase(unittest.TestCase):
class SessionRenderTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.interface = MockInterface()
self.terminal.display = BufferedDisplay(self.terminal, Dimensions(24, 80), None)
self.terminal = _create_terminal(self.interface)
self.terminal.display.buffered_write_byte = Mock(wraps=self.terminal.display.buffered_write_byte)
self.terminal.display.move_cursor = Mock(wraps=self.terminal.display.move_cursor)
@@ -217,36 +228,10 @@ class SessionRenderTestCase(unittest.TestCase):
self.session = TN3270Session(self.terminal, 'mainframe', 23)
self.telnet = Mock()
self.session.telnet = create_autospec(Telnet, instance=True)
self.session.emulator = create_autospec(Emulator, instance=True)
self.session.telnet = self.telnet
self.session.emulator = Mock()
patcher = patch('oec.display.read_address_counter_hi')
self.read_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.read_address_counter_lo')
self.read_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_hi')
self.load_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_lo')
self.load_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.write_data')
self.write_data_mock = patcher.start()
patcher = patch('oec.display.eab_write_alternate')
self.eab_write_alternate_mock = patcher.start()
self.addCleanup(patch.stopall)
self.session.emulator.keyboard_locked = False
def test_with_no_eab_feature(self):
# Arrange
@@ -382,6 +367,19 @@ class SessionRenderTestCase(unittest.TestCase):
# Assert
self.terminal.display.status_line.write.assert_called_with(8, bytes.fromhex('f600db080000000000'))
def _create_terminal(interface):
terminal_id = TerminalId(0b11110100)
extended_id = 'c1348300'
dimensions = Dimensions(24, 80)
features = { }
keymap = KEYMAP_3278_2
terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap)
terminal.display.status_line = create_autospec(StatusLine, instance=True)
return terminal
def _create_screen_cells(rows, columns):
return [CharacterCell(0x00) for address in range(rows * columns)]

View File

@@ -1,22 +1,33 @@
import unittest
from unittest.mock import Mock, patch
from unittest.mock import Mock, create_autospec
from logging import Logger
from ptyprocess import PtyProcess
from coax.protocol import TerminalId
import context
from oec.session import SessionDisconnectedError
from oec.interface import InterfaceWrapper
from oec.terminal import Terminal
from oec.display import Dimensions, BufferedDisplay
from oec.keyboard import Key, KeyboardModifiers
from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2
from oec.session import SessionDisconnectedError
from oec.vt100 import VT100Session
from mock_interface import MockInterface
class SessionHandleHostTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.interface = MockInterface()
self.terminal.display.dimensions = Dimensions(24, 80)
self.terminal = _create_terminal(self.interface)
self.terminal.sound_alarm = Mock(wraps=self.terminal.sound_alarm)
self.session = VT100Session(self.terminal, None)
self.session.host_process = Mock()
self.session.host_process = create_autospec(PtyProcess, instance=True)
def test(self):
# Arrange
@@ -54,13 +65,13 @@ class SessionHandleHostTestCase(unittest.TestCase):
class SessionHandleKeyTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.interface = MockInterface()
self.terminal.display.dimensions = Dimensions(24, 80)
self.terminal = _create_terminal(self.interface)
self.session = VT100Session(self.terminal, None)
self.session.host_process = Mock()
self.session.host_process = create_autospec(PtyProcess, instance=True)
def test_printable(self):
# Act
@@ -87,7 +98,7 @@ class SessionHandleKeyTestCase(unittest.TestCase):
def test_unmapped_alt_modifier(self):
# Arrange
self.session.logger = Mock()
self.session.logger = create_autospec(Logger, instance=True)
# Act
self.session.handle_key(Key.THREE, KeyboardModifiers.LEFT_ALT, None)
@@ -106,9 +117,9 @@ class SessionHandleKeyTestCase(unittest.TestCase):
class SessionRenderTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.interface = MockInterface()
self.terminal.display = BufferedDisplay(self.terminal, Dimensions(24, 80), None)
self.terminal = _create_terminal(self.interface)
self.terminal.display.buffered_write_byte = Mock(wraps=self.terminal.display.buffered_write_byte)
self.terminal.display.move_cursor = Mock(wraps=self.terminal.display.move_cursor)
@@ -116,33 +127,7 @@ class SessionRenderTestCase(unittest.TestCase):
self.session = VT100Session(self.terminal, None)
self.session.host_process = Mock()
patcher = patch('oec.display.read_address_counter_hi')
self.read_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.read_address_counter_lo')
self.read_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_hi')
self.load_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_lo')
self.load_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.write_data')
self.write_data_mock = patcher.start()
patcher = patch('oec.display.eab_write_alternate')
self.eab_write_alternate_mock = patcher.start()
self.addCleanup(patch.stopall)
self.session.host_process = create_autospec(PtyProcess, instance=True)
def test_with_no_eab_feature(self):
# Arrange
@@ -189,3 +174,14 @@ class SessionRenderTestCase(unittest.TestCase):
self.terminal.display.move_cursor.assert_called_with(row=0, column=3)
self.assertFalse(self.session.vt100_screen.dirty)
def _create_terminal(interface):
terminal_id = TerminalId(0b11110100)
extended_id = 'c1348300'
dimensions = Dimensions(24, 80)
features = { }
keymap = KEYMAP_3278_2
terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap)
return terminal