diff --git a/oec/__main__.py b/oec/__main__.py index d03e02c..875172a 100644 --- a/oec/__main__.py +++ b/oec/__main__.py @@ -4,6 +4,7 @@ import logging import argparse from coax import open_serial_interface +from .interface import InterfaceWrapper from .controller import Controller from .tn3270 import TN3270Session @@ -87,7 +88,7 @@ def main(): with open_serial_interface(args.serial_port) as interface: create_session = lambda terminal: _create_session(args, terminal) - controller = Controller(interface, _get_keymap, create_session) + controller = Controller(InterfaceWrapper(interface), _get_keymap, create_session) print('Starting controller...') diff --git a/oec/controller.py b/oec/controller.py index 51b7ad7..2670814 100644 --- a/oec/controller.py +++ b/oec/controller.py @@ -6,9 +6,10 @@ oec.controller import time import logging import selectors -from coax import poll, poll_ack, KeystrokePollResponse, ReceiveTimeout, \ +from coax import Poll, PollAck, KeystrokePollResponse, ReceiveTimeout, \ ReceiveError, ProtocolError +from .interface import address_commands from .terminal import create_terminal, UnsupportedTerminalError from .keyboard import Key from .session import SessionDisconnectedError @@ -64,6 +65,8 @@ class Controller: self.running = False def _run_loop(self): + device_address = None + poll_delay = self._calculate_poll_delay(time.perf_counter()) # If POLLing is delayed, handle the host output, otherwise just sleep. @@ -77,7 +80,7 @@ class Controller: time.sleep(poll_delay) try: - poll_response = self._poll() + poll_response = self._poll(device_address) except ReceiveTimeout: if self.terminal: self._handle_terminal_detached() @@ -92,7 +95,7 @@ class Controller: if not self.terminal: try: - self._handle_terminal_attached(poll_response) + self._handle_terminal_attached(device_address, poll_response) except UnsupportedTerminalError as error: self.logger.error(f'Unsupported terminal: {error}') return @@ -100,10 +103,11 @@ class Controller: if poll_response: self._handle_poll_response(poll_response) - def _handle_terminal_attached(self, poll_response): + def _handle_terminal_attached(self, device_address, poll_response): self.logger.info('Terminal attached') - self.terminal = create_terminal(self.interface, poll_response, self.get_keymap) + self.terminal = create_terminal(self.interface, device_address, poll_response, + self.get_keymap) self.terminal.setup() @@ -198,21 +202,19 @@ class Controller: self.session.render() - def _poll(self): + def _poll(self, device_address): self.last_poll_time = time.perf_counter() # If a terminal is connected, use the terminal method to ensure that # any queued POLL action is applied. if self.terminal: - poll_response = self.terminal.poll(receive_timeout=1) + poll_response = self.terminal.poll() else: - poll_response = poll(self.interface, receive_timeout=1) + poll_response = self.interface.execute(address_commands(device_address, Poll())) if poll_response: try: - poll_ack(self.interface) - except ReceiveError as error: - self.logger.warning(f'POLL_ACK receive error: {error}', exc_info=error) + self.interface.execute(address_commands(device_address, PollAck())) except ProtocolError as error: self.logger.warning(f'POLL_ACK protocol error: {error}', exc_info=error) diff --git a/oec/display.py b/oec/display.py index a5cb3e2..851731d 100644 --- a/oec/display.py +++ b/oec/display.py @@ -8,21 +8,19 @@ from itertools import zip_longest import logging from more_itertools import interleave from sortedcontainers import SortedSet -from coax import read_address_counter_hi, read_address_counter_lo, \ - load_address_counter_hi, load_address_counter_lo, write_data, \ - eab_load_mask, eab_write_alternate +from coax import ReadAddressCounterHi, ReadAddressCounterLo, LoadAddressCounterHi, \ + LoadAddressCounterLo, WriteData, EABLoadMask, EABWriteAlternate, Data # Does not include the status line row. Dimensions = namedtuple('Dimensions', ['rows', 'columns']) class Display: - def __init__(self, terminal, dimensions, eab_address, jumbo_write_strategy=None): + def __init__(self, terminal, dimensions, eab_address): self.logger = logging.getLogger(__name__) self.terminal = terminal self.dimensions = dimensions self.eab_address = eab_address - self.jumbo_write_strategy = jumbo_write_strategy self.address_counter = None self.last_address = ((dimensions.rows + 1) * dimensions.columns) - 1 @@ -109,7 +107,7 @@ class Display: if not self.has_eab: raise RuntimeError('No EAB feature') - eab_load_mask(self.terminal.interface, self.eab_address, mask) + self.terminal.execute(EABLoadMask(self.eab_address, mask)) def toggle_cursor_blink(self): self.terminal.control.cursor_blink = not self.terminal.control.cursor_blink @@ -149,8 +147,7 @@ class Display: return address def _read_address_counter(self): - hi = read_address_counter_hi(self.terminal.interface) - lo = read_address_counter_lo(self.terminal.interface) + [hi, lo] = self.terminal.execute([ReadAddressCounterHi(), ReadAddressCounterLo()]) self.address_counter = (hi << 8) | lo @@ -163,21 +160,41 @@ class Display: (hi, lo) = _split_address(address) (current_hi, current_lo) = _split_address(self.address_counter) + commands = [] + if hi != current_hi or force_load: - load_address_counter_hi(self.terminal.interface, hi) + commands.append(LoadAddressCounterHi(hi)) if lo != current_lo or force_load: - load_address_counter_lo(self.terminal.interface, lo) + commands.append(LoadAddressCounterLo(lo)) + + self.terminal.execute(commands) self.address_counter = address return True def _write_data(self, data): - write_data(self.terminal.interface, data, jumbo_write_strategy=self.jumbo_write_strategy) + chunks = self.terminal.interface.jumbo_write_split_data(data, -1) + + commands = [WriteData(chunks[0])] + + for chunk in chunks[1:]: + commands.append(Data(chunk)) + + self.terminal.execute(WriteData(data)) def _eab_write_alternate(self, data): - eab_write_alternate(self.terminal.interface, self.eab_address, data, jumbo_write_strategy=self.jumbo_write_strategy) + # The EAB_WRITE_ALTERNATE command data must be split so that the two bytes + # do not get separated, otherwise the write will be incorrect. + chunks = self.terminal.interface.jumbo_write_split_data(data, -2) + + commands = [EABWriteAlternate(self.eab_address, chunks[0])] + + for chunk in chunks[1:]: + commands.append(Data(chunk)) + + self.terminal.execute(commands) def _split_address(address): if address is None: @@ -220,8 +237,8 @@ class StatusLine: self.write(45, indicators) class BufferedDisplay(Display): - def __init__(self, terminal, dimensions, eab_address, jumbo_write_strategy=None): - super().__init__(terminal, dimensions, eab_address, jumbo_write_strategy) + def __init__(self, terminal, dimensions, eab_address): + super().__init__(terminal, dimensions, eab_address) length = (self.dimensions.rows + 1) * self.dimensions.columns diff --git a/oec/interface.py b/oec/interface.py new file mode 100644 index 0000000..14e9207 --- /dev/null +++ b/oec/interface.py @@ -0,0 +1,116 @@ +""" +oec.interface +~~~~~~~~~~~~~ +""" + +import os +import logging +from textwrap import dedent + +from more_itertools import chunked + +logger = logging.getLogger(__name__) + +class AggregateExecuteError(Exception): + def __init__(self, errors, responses): + super().__init__('One or more errors occurred') + + self.errors = errors + self.responses = responses + +class InterfaceWrapper: + def __init__(self, interface): + self.interface = interface + + self.timeout = 0.1 + + self.jumbo_write_strategy = _get_jumbo_write_strategy() + self.jumbo_write_max_length = None + + if self.legacy_firmware_detected and self.jumbo_write_strategy is None: + _print_i1_jumbo_write_notice() + + self.jumbo_write_strategy = 'split' + + if self.jumbo_write_strategy == 'split': + self.jumbo_write_max_length = 1024 + + def __getattr__(self, attr): + return getattr(self.interface, attr) + + def execute(self, commands): + if not isinstance(commands, list): + return self.interface.execute(commands, self.timeout) + + responses = self.interface.execute(commands, self.timeout) + + errors = [response for response in responses if isinstance(response, BaseException)] + + if any(errors): + raise AggregateExecuteError(errors, responses) + + return responses + + def jumbo_write_split_data(self, data, first_chunk_max_length_adjustment=-1): + if self.jumbo_write_strategy != 'split': + return [data] + + if isinstance(data, tuple): + length = len(data[0]) * data[1] + else: + length = len(data) + + first_chunk_max_length = self.jumbo_write_max_length + first_chunk_max_length_adjustment + + if length <= first_chunk_max_length: + return [data] + + if isinstance(data, tuple): + data = data[0] * data[1] + + return [data[:first_chunk_max_length], *chunked(data[first_chunk_max_length:], self.jumbo_write_max_length)] + +def address_commands(device_address, commands): + if isinstance(commands, list): + return [(device_address, command) for command in commands] + + return (device_address, commands) + +def _get_jumbo_write_strategy(): + value = os.environ.get('COAX_JUMBO') + + if value is None: + return None + + if value in ['split', 'ignore']: + return value + + logger.warning(f'Unsupported COAX_JUMBO option: {value}') + + return None + +def _print_i1_jumbo_write_notice(): + notice = ''' + **** **** **** **** **** **** **** **** **** **** **** **** **** **** **** **** + + I think you are using an older firmware on the 1st generation, Arduino Mega + based, interface which does not support the "jumbo write" required to write a + full screen to the regen and EAB buffers. + + I'm going to split large writes into multiple smaller 1024-byte writes... + + If you want to override this behavior, you can set the COAX_JUMBO environment + variable as follows: + + - COAX_JUMBO=split - split large writes into multiple smaller 32-byte writes + before sending to the interface, this will result in + additional round trips to the interface which may + manifest as visible incremental changes being applied + to the screen + - COAX_JUMBO=ignore - try a jumbo write, anyway, use this option if you + believe you are seeing this behavior in error + + **** **** **** **** **** **** **** **** **** **** **** **** **** **** **** **** + ''' + + print(dedent(notice)) diff --git a/oec/terminal.py b/oec/terminal.py index 7456d4f..2e107f2 100644 --- a/oec/terminal.py +++ b/oec/terminal.py @@ -3,14 +3,13 @@ oec.terminal ~~~~~~~~~~~~ """ -import os import time import logging -from textwrap import dedent -from coax import poll, read_terminal_id, read_extended_id, get_features, \ - load_control_register, TerminalType, Feature, PollAction, Control, \ - ReceiveError, ProtocolError +from coax import read_feature_ids, parse_features, Poll, ReadTerminalId, ReadExtendedId, \ + LoadControlRegister, TerminalType, Feature, PollAction, Control, \ + ProtocolError +from .interface import address_commands from .display import Dimensions, BufferedDisplay from .keyboard import Keyboard @@ -26,9 +25,10 @@ MODEL_DIMENSIONS = { class Terminal: """The terminal.""" - def __init__(self, interface, terminal_id, extended_id, dimensions, features, - keymap, jumbo_write_strategy=None): + def __init__(self, interface, device_address, terminal_id, extended_id, dimensions, + features, keymap): self.interface = interface + self.device_address = device_address self.terminal_id = terminal_id self.extended_id = extended_id self.features = features @@ -37,8 +37,7 @@ class Terminal: cursor_inhibit=False, cursor_reverse=False, cursor_blink=False) - self.display = BufferedDisplay(self, dimensions, features.get(Feature.EAB), - jumbo_write_strategy=jumbo_write_strategy) + self.display = BufferedDisplay(self, dimensions, features.get(Feature.EAB)) self.keyboard = Keyboard(keymap) self.alarm = False @@ -53,7 +52,7 @@ class Terminal: self.display.clear(clear_status_line=True) - def poll(self, **kwargs): + def poll(self): """Execute a POLL command with queued actions.""" poll_action = PollAction.NONE @@ -66,7 +65,7 @@ class Terminal: else: poll_action = PollAction.DISABLE_KEYBOARD_CLICKER - poll_response = poll(self.interface, poll_action, **kwargs) + poll_response = self.execute(Poll(poll_action)) # Clear the queued alarm and keyboard clicker change if the POLL was # successful. @@ -84,17 +83,18 @@ class Terminal: def load_control_register(self): """Execute a LOAD_CONTROL_REGISTER command.""" - load_control_register(self.interface, self.control) + self.execute(LoadControlRegister(self.control)) + + def execute(self, commands): + return self.interface.execute(address_commands(self.device_address, commands)) class UnsupportedTerminalError(Exception): """Unsupported terminal.""" -def create_terminal(interface, poll_response, get_keymap): +def create_terminal(interface, device_address, poll_response, get_keymap): """Terminal factory.""" - jumbo_write_strategy = _get_jumbo_write_strategy() - # Read the terminal identifiers. - (terminal_id, extended_id) = _read_terminal_ids(interface) + (terminal_id, extended_id) = _read_terminal_ids(interface, device_address) logger.info(f'Terminal ID = {terminal_id}, Extended ID = {extended_id}') @@ -110,35 +110,27 @@ def create_terminal(interface, poll_response, get_keymap): logger.info(f'Rows = {dimensions.rows}, Columns = {dimensions.columns}') # Get the terminal features. - features = get_features(interface) + features = _get_features(interface, device_address) logger.info(f'Features = {features}') - if Feature.EAB in features: - if interface.legacy_firmware_detected and jumbo_write_strategy is None: - del features[Feature.EAB] - - _print_no_i1_eab_notice() - # Get the keymap. keymap = get_keymap(terminal_id, extended_id) logger.info(f'Keymap = {keymap.name}') # Create the terminal. - terminal = Terminal(interface, terminal_id, extended_id, dimensions, features, - keymap, jumbo_write_strategy=jumbo_write_strategy) + terminal = Terminal(interface, device_address, terminal_id, extended_id, dimensions, + features, keymap) return terminal -def _read_terminal_ids(interface, extended_id_retry_attempts=3): +def _read_terminal_ids(interface, device_address, extended_id_retry_attempts=3): terminal_id = None extended_id = None try: - terminal_id = read_terminal_id(interface) - except ReceiveError as error: - logger.warning(f'READ_TERMINAL_ID receive error: {error}', exc_info=error) + terminal_id = interface.execute(address_commands(device_address, ReadTerminalId())) except ProtocolError as error: logger.warning(f'READ_TERMINAL_ID protocol error: {error}', exc_info=error) @@ -148,11 +140,9 @@ def _read_terminal_ids(interface, extended_id_retry_attempts=3): for attempt in range(extended_id_retry_attempts): try: - extended_id = read_extended_id(interface) + extended_id = interface.execute(address_commands(device_address, ReadExtendedId())) break - except ReceiveError as error: - logger.warning(f'READ_EXTENDED_ID receive error: {error}', exc_info=error) except ProtocolError as error: logger.warning(f'READ_EXTENDED_ID protocol error: {error}', exc_info=error) @@ -160,44 +150,9 @@ def _read_terminal_ids(interface, extended_id_retry_attempts=3): return (terminal_id, extended_id.hex() if extended_id is not None else None) -def _get_jumbo_write_strategy(): - value = os.environ.get('COAX_JUMBO') +def _get_features(interface, device_address): + commands = read_feature_ids() - if value is None: - return None + ids = interface.execute([address_commands(device_address, command) for command in commands]) - if value in ['split', 'ignore']: - return value - - logger.warning(f'Unsupported COAX_JUMBO option: {value}') - - return None - -def _print_no_i1_eab_notice(): - notice = ''' - **** **** **** **** **** **** **** **** **** **** **** **** **** **** **** **** - - Your terminal is reporting the existence of an EAB feature that allows extended - colors and formatting, however... - - I think you are using an older firmware on the 1st generation, Arduino Mega - based, interface which does not support the "jumbo write" required to write a - full screen to the regen and EAB buffers. - - I'm going to continue as if the EAB feature did not exist... - - If you want to override this behavior, you can set the COAX_JUMBO environment - variable as follows: - - - COAX_JUMBO=split - split large writes into multiple smaller 32-byte writes - before sending to the interface, this will result in - additional round trips to the interface which may - manifest as visible incremental changes being applied - to the screen - - COAX_JUMBO=ignore - try a jumbo write, anyway, use this option if you - believe you are seeing this behavior in error - - **** **** **** **** **** **** **** **** **** **** **** **** **** **** **** **** - ''' - - print(dedent(notice)) + return parse_features(ids, commands) diff --git a/requirements.txt b/requirements.txt index 79ca977..5e2a76d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ more-itertools==8.7.0 ptyprocess==0.7.0 -pycoax==0.9.0 +pycoax==0.10.0 pyserial==3.5 pyte==0.8.0 pytn3270==0.10.0 diff --git a/tests/mock_interface.py b/tests/mock_interface.py new file mode 100644 index 0000000..fd737bd --- /dev/null +++ b/tests/mock_interface.py @@ -0,0 +1,59 @@ +from unittest.mock import Mock + +from coax import ProtocolError, ReceiveError, ReceiveTimeout +from coax.interface import Interface + +class MockInterface(Interface): + def __init__(self, responses=[]): + self.mock_responses = responses + + self.legacy_firmware_detected = None + self.legacy_firmware_version = None + + # Wrap the reset and execute methods so calls can be asserted. + self.reset = Mock(wraps=self.reset) + self._execute = Mock(wraps=self._execute) + + def _execute(self, commands, timeout): + return [self._mock_get_response(device_address, command) for (device_address, command) in commands] + + def reset_mock(self): + self.reset.reset_mock() + self._execute.reset_mock() + + def assert_command_executed(self, device_address, command_type, predicate=None): + if not self._mock_get_execute_commands(device_address, command_type, predicate): + raise AssertionError('Expected command to be executed') + + def assert_command_not_executed(self, device_address, command_type, predicate=None): + if self._mock_get_execute_commands(device_address, command_type, predicate): + raise AssertionError('Expected command not to be executed') + + def _mock_get_execute_commands(self, device_address, command_type, predicate): + calls = self._execute.call_args_list + + commands = [] + + for call in calls: + for command in call[0][0]: + (call_device_address, call_command) = command + + if call_device_address == device_address and isinstance(call_command, command_type): + if predicate is None or predicate(call_command): + commands.append(command) + + return commands + + def _mock_get_response(self, device_address, command): + for (mock_device_address, mock_command_type, mock_predicate, mock_response) in self.mock_responses: + if mock_device_address == device_address and isinstance(command, mock_command_type): + if mock_predicate is None or mock_predicate(command): + if callable(mock_response): + try: + return mock_response() + except (ProtocolError, ReceiveError, ReceiveTimeout) as error: + return error + + return mock_response + + return None diff --git a/tests/test_controller.py b/tests/test_controller.py index 7f1520f..16be1b2 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -1,36 +1,42 @@ -import selectors import unittest -from unittest.mock import Mock, patch -from coax import PollAction, PowerOnResetCompletePollResponse, KeystrokePollResponse, ReceiveTimeout +from unittest.mock import Mock, create_autospec, patch + +import selectors +from selectors import BaseSelector +from logging import Logger +from coax import Poll, PollAction, PowerOnResetCompletePollResponse, KeystrokePollResponse, PollAck, ReceiveTimeout from coax.protocol import TerminalId import context +from oec.interface import InterfaceWrapper from oec.controller import Controller -from oec.session import SessionDisconnectedError from oec.terminal import Terminal, UnsupportedTerminalError from oec.display import Dimensions from oec.keyboard import KeyboardModifiers, Key from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2 +from oec.session import Session, SessionDisconnectedError + +from mock_interface import MockInterface class RunLoopTestCase(unittest.TestCase): def setUp(self): - self.interface = Mock() + self.interface = MockInterface() - self.session_mock = Mock() - self.create_session_mock = Mock(return_value=self.session_mock) + self.session = create_autospec(Session, instance=True) + self.create_session = Mock(return_value=self.session) - self.controller = Controller(self.interface, lambda terminal_id, extended_id: KEYMAP_3278_2, self.create_session_mock) + self.controller = Controller(InterfaceWrapper(self.interface), lambda terminal_id, extended_id: KEYMAP_3278_2, self.create_session) - self.controller.logger = Mock() + self.controller.logger = create_autospec(Logger, instance=True) self.controller.connected_poll_period = 1 - self.controller.session_selector = Mock() + self.controller.session_selector = create_autospec(BaseSelector, instance=True) self.controller._update_session = Mock() - self.terminal = Terminal(self.interface, TerminalId(0b11110100), 'c1348300', Dimensions(24, 80), { }, KEYMAP_3278_2, None) + self.terminal = Terminal(self.interface, None, TerminalId(0b11110100), 'c1348300', Dimensions(24, 80), { }, KEYMAP_3278_2) self.terminal.setup = Mock() @@ -40,33 +46,19 @@ class RunLoopTestCase(unittest.TestCase): self.terminal.keyboard.toggle_clicker = Mock() - self.poll_mock = Mock() - - patcher = patch('oec.controller.poll', self.poll_mock) - - patcher.start() - - patcher = patch('oec.terminal.poll', self.poll_mock) - - patcher.start() - - patcher = patch('oec.controller.poll_ack') - - self.poll_ack_mock = patcher.start() - patcher = patch('oec.controller.time.perf_counter') - self.perf_counter_mock = patcher.start() + self.perf_counter = patcher.start() patcher = patch('oec.controller.time.sleep') - self.sleep_mock = patcher.start() + self.sleep = patcher.start() patcher = patch('oec.controller.create_terminal') - self.create_terminal_mock = patcher.start() + self.create_terminal = patcher.start() - self.create_terminal_mock.return_value = self.terminal + self.create_terminal.return_value = self.terminal self.addCleanup(patch.stopall) @@ -88,7 +80,7 @@ class RunLoopTestCase(unittest.TestCase): self.controller._update_session.assert_called() def test_unsupported_terminal_attached(self): - self.create_terminal_mock.side_effect = [UnsupportedTerminalError] + self.create_terminal.side_effect = [UnsupportedTerminalError] self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) @@ -114,7 +106,7 @@ class RunLoopTestCase(unittest.TestCase): self.assertIsNone(self.controller.terminal) self.assertIsNone(self.controller.session) - self.session_mock.terminate.assert_called() + self.session.terminate.assert_called() def test_session_disconnected(self): self.controller._update_session.side_effect = [None, SessionDisconnectedError, None] @@ -127,7 +119,7 @@ class RunLoopTestCase(unittest.TestCase): self.assertIsNotNone(self.controller.terminal) self.assertIsNotNone(self.controller.session) - self.assertEqual(self.create_session_mock.call_count, 2) + self.assertEqual(self.create_session.call_count, 2) def test_toggle_cursor_blink(self): self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) @@ -178,13 +170,13 @@ class RunLoopTestCase(unittest.TestCase): # Arrange self.controller._update_session.reset_mock() - self.poll_mock.side_effect = [poll_response] + self.interface.mock_responses = [(None, Poll, None, poll_response)] - self.poll_ack_mock.reset_mock() + self.interface.reset_mock() - self.perf_counter_mock.side_effect = [poll_time, poll_time + expected_poll_delay] + self.perf_counter.side_effect = [poll_time, poll_time + expected_poll_delay] - self.sleep_mock.reset_mock() + self.sleep.reset_mock() # Act self.controller._run_loop() @@ -192,31 +184,31 @@ class RunLoopTestCase(unittest.TestCase): # Assert if expected_update_session: self.controller._update_session.assert_called_once_with(expected_poll_delay) - self.sleep_mock.assert_not_called() + self.sleep.assert_not_called() else: self.controller._update_session.assert_not_called() if expected_poll_delay > 0: - self.sleep_mock.assert_called_once_with(expected_poll_delay) + self.sleep.assert_called_once_with(expected_poll_delay) else: - self.sleep_mock.assert_not_called() + self.sleep.assert_not_called() if expected_poll_ack: - self.poll_ack_mock.assert_called_once() + self.interface.assert_command_executed(None, PollAck) else: - self.poll_ack_mock.assert_not_called() + self.interface.assert_command_not_executed(None, PollAck) class UpdateSessionTestCase(unittest.TestCase): def setUp(self): self.controller = Controller(None, None, None) - self.controller.session = Mock() + self.controller.session = create_autospec(Session, instance=True) - self.controller.session_selector = Mock() + self.controller.session_selector = create_autospec(BaseSelector, instance=True) patcher = patch('oec.controller.time.perf_counter') - self.perf_counter_mock = patcher.start() + self.perf_counter = patcher.start() def test_zero_duration(self): # Act @@ -243,7 +235,7 @@ class UpdateSessionTestCase(unittest.TestCase): def test_select_available(self): # Arrange - self.perf_counter_mock.side_effect = [0, 0.75, 0.75] + self.perf_counter.side_effect = [0, 0.75, 0.75] selector_key = Mock(fileobj=self.controller.session) diff --git a/tests/test_display.py b/tests/test_display.py index 216ed38..c256efe 100644 --- a/tests/test_display.py +++ b/tests/test_display.py @@ -1,47 +1,27 @@ import unittest -from unittest.mock import Mock, patch +from unittest.mock import Mock, create_autospec + +from coax import ReadAddressCounterHi, ReadAddressCounterLo, LoadAddressCounterHi, LoadAddressCounterLo, Feature +from coax.protocol import TerminalId import context -from oec.display import Dimensions, Display, StatusLine, BufferedDisplay, encode_ascii_character, encode_ebcdic_character, encode_string +from oec.interface import InterfaceWrapper +from oec.terminal import Terminal +from oec.display import Display, Dimensions, StatusLine, BufferedDisplay, encode_ascii_character, encode_ebcdic_character, encode_string +from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2 + +from mock_interface import MockInterface class DisplayClearTestCase(unittest.TestCase): def setUp(self): - self.terminal = Mock() + self.interface = MockInterface() - dimensions = Dimensions(24, 80) - - self.display = Display(self.terminal, dimensions, None) + self.display = _create_display(self.interface) self.display.write = Mock(wraps=self.display.write) self.display._load_address_counter = Mock(wraps=self.display._load_address_counter) - patcher = patch('oec.display.read_address_counter_hi') - - self.read_address_counter_hi_mock = patcher.start() - - patcher = patch('oec.display.read_address_counter_lo') - - self.read_address_counter_lo_mock = patcher.start() - - patcher = patch('oec.display.load_address_counter_hi') - - self.load_address_counter_hi_mock = patcher.start() - - patcher = patch('oec.display.load_address_counter_lo') - - self.load_address_counter_lo_mock = patcher.start() - - patcher = patch('oec.display.write_data') - - self.write_data_mock = patcher.start() - - patcher = patch('oec.display.eab_write_alternate') - - self.eab_write_alternate_mock = patcher.start() - - self.addCleanup(patch.stopall) - def test_excluding_status_line_with_no_eab_feature(self): # Act self.display.clear(clear_status_line=False) @@ -82,24 +62,12 @@ class DisplayClearTestCase(unittest.TestCase): class DisplayMoveCursorTestCase(unittest.TestCase): def setUp(self): - self.terminal = Mock() + self.interface = MockInterface() - dimensions = Dimensions(24, 80) - - self.display = Display(self.terminal, dimensions, None) + self.display = _create_display(self.interface) self.display._load_address_counter = Mock(wraps=self.display._load_address_counter) - patcher = patch('oec.display.load_address_counter_hi') - - self.load_address_counter_hi_mock = patcher.start() - - patcher = patch('oec.display.load_address_counter_lo') - - self.load_address_counter_lo_mock = patcher.start() - - self.addCleanup(patch.stopall) - def test_with_address(self): # Act self.display.move_cursor(address=895) @@ -138,40 +106,14 @@ class DisplayMoveCursorTestCase(unittest.TestCase): class DisplayWriteTestCase(unittest.TestCase): def setUp(self): - self.terminal = Mock() + self.interface = MockInterface() - dimensions = Dimensions(24, 80) - - self.display = Display(self.terminal, dimensions, None) + self.display = _create_display(self.interface) self.display._read_address_counter = Mock(wraps=self.display._read_address_counter) self.display._load_address_counter = Mock(wraps=self.display._load_address_counter) - - patcher = patch('oec.display.read_address_counter_hi') - - self.read_address_counter_hi_mock = patcher.start() - - patcher = patch('oec.display.read_address_counter_lo') - - self.read_address_counter_lo_mock = patcher.start() - - patcher = patch('oec.display.load_address_counter_hi') - - self.load_address_counter_hi_mock = patcher.start() - - patcher = patch('oec.display.load_address_counter_lo') - - self.load_address_counter_lo_mock = patcher.start() - - patcher = patch('oec.display.write_data') - - self.write_data_mock = patcher.start() - - patcher = patch('oec.display.eab_write_alternate') - - self.eab_write_alternate_mock = patcher.start() - - self.addCleanup(patch.stopall) + self.display._write_data = Mock(wraps=self.display._write_data) + self.display._eab_write_alternate = Mock(wraps=self.display._eab_write_alternate) def test_no_eab_feature(self): # Act and assert @@ -266,8 +208,10 @@ class DisplayWriteTestCase(unittest.TestCase): # Arrange self.assertIsNone(self.display.address_counter) - self.read_address_counter_hi_mock.return_value = 0 - self.read_address_counter_lo_mock.return_value = 160 + self.interface.mock_responses = [ + (None, ReadAddressCounterHi, None, 0), + (None, ReadAddressCounterLo, None, 160) + ] # Act self.display.write(bytes.fromhex('01 02 03'), None, restore_original_address=True) @@ -299,7 +243,7 @@ class DisplayWriteTestCase(unittest.TestCase): self.display.write(bytes.fromhex('01 02 03'), None) # Assert - self.write_data_mock.assert_called_with(self.terminal.interface, bytes.fromhex('01 02 03'), jumbo_write_strategy=None) + self.display._write_data.assert_called_with(bytes.fromhex('01 02 03')) def test_regen_only_repeat(self): # Arrange @@ -309,7 +253,7 @@ class DisplayWriteTestCase(unittest.TestCase): self.display.write((bytes.fromhex('01 02 03'), 2), None) # Assert - self.write_data_mock.assert_called_with(self.terminal.interface, (bytes.fromhex('01 02 03'), 2), jumbo_write_strategy=None) + self.display._write_data.assert_called_with((bytes.fromhex('01 02 03'), 2)) def test_regen_eab(self): # Arrange @@ -320,7 +264,7 @@ class DisplayWriteTestCase(unittest.TestCase): self.display.write(bytes.fromhex('01 02 03'), bytes.fromhex('04 05 06')) # Assert - self.eab_write_alternate_mock.assert_called_with(self.terminal.interface, 7, bytes.fromhex('01 04 02 05 03 06'), jumbo_write_strategy=None) + self.display._eab_write_alternate.assert_called_with(bytes.fromhex('01 04 02 05 03 06')) def test_regen_eab_repeat(self): # Arrange @@ -331,25 +275,13 @@ class DisplayWriteTestCase(unittest.TestCase): self.display.write((bytes.fromhex('01 02 03'), 2), (bytes.fromhex('04 05 06'), 2)) # Assert - self.eab_write_alternate_mock.assert_called_with(self.terminal.interface, 7, (bytes.fromhex('01 04 02 05 03 06'), 2), jumbo_write_strategy=None) + self.display._eab_write_alternate.assert_called_with((bytes.fromhex('01 04 02 05 03 06'), 2)) class DisplayLoadAddressCounterTestCase(unittest.TestCase): def setUp(self): - self.terminal = Mock() + self.interface = MockInterface() - dimensions = Dimensions(24, 80) - - self.display = Display(self.terminal, dimensions, None) - - patcher = patch('oec.display.load_address_counter_hi') - - self.load_address_counter_hi_mock = patcher.start() - - patcher = patch('oec.display.load_address_counter_lo') - - self.load_address_counter_lo_mock = patcher.start() - - self.addCleanup(patch.stopall) + self.display = _create_display(self.interface) def test(self): # Act @@ -358,15 +290,14 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase): # Assert self.assertEqual(self.display.address_counter, 895) - self.load_address_counter_hi_mock.assert_called_with(self.terminal.interface, 3) - self.load_address_counter_lo_mock.assert_called_with(self.terminal.interface, 127) + self.assert_load_address_counter_hi(3) + self.assert_load_address_counter_lo(127) def test_hi_change(self): # Arrange self.display._load_address_counter(895, force_load=False) - self.load_address_counter_hi_mock.reset_mock() - self.load_address_counter_lo_mock.reset_mock() + self.interface.reset_mock() # Act self.display._load_address_counter(1151, force_load=False) @@ -374,15 +305,14 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase): # Assert self.assertEqual(self.display.address_counter, 1151) - self.load_address_counter_hi_mock.assert_called_with(self.terminal.interface, 4) - self.load_address_counter_lo_mock.assert_not_called() + self.assert_load_address_counter_hi(4) + self.interface.assert_command_not_executed(None, LoadAddressCounterLo) def test_lo_change(self): # Arrange self.display._load_address_counter(895, force_load=False) - self.load_address_counter_hi_mock.reset_mock() - self.load_address_counter_lo_mock.reset_mock() + self.interface.reset_mock() # Act self.display._load_address_counter(896, force_load=False) @@ -390,15 +320,14 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase): # Assert self.assertEqual(self.display.address_counter, 896) - self.load_address_counter_hi_mock.assert_not_called() - self.load_address_counter_lo_mock.assert_called_with(self.terminal.interface, 128) + self.interface.assert_command_not_executed(None, LoadAddressCounterHi) + self.assert_load_address_counter_lo(128) def test_hi_lo_change(self): # Arrange self.display._load_address_counter(895, force_load=False) - self.load_address_counter_hi_mock.reset_mock() - self.load_address_counter_lo_mock.reset_mock() + self.interface.reset_mock() # Act self.display._load_address_counter(1152, force_load=False) @@ -406,15 +335,14 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase): # Assert self.assertEqual(self.display.address_counter, 1152) - self.load_address_counter_hi_mock.assert_called_with(self.terminal.interface, 4) - self.load_address_counter_lo_mock.assert_called_with(self.terminal.interface, 128) + self.assert_load_address_counter_hi(4) + self.assert_load_address_counter_lo(128) def test_no_change(self): # Arrange self.display._load_address_counter(80, force_load=False) - self.load_address_counter_hi_mock.reset_mock() - self.load_address_counter_lo_mock.reset_mock() + self.interface.reset_mock() # Act self.display._load_address_counter(80, force_load=False) @@ -422,15 +350,14 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase): # Assert self.assertEqual(self.display.address_counter, 80) - self.load_address_counter_hi_mock.assert_not_called() - self.load_address_counter_lo_mock.assert_not_called() + self.interface.assert_command_not_executed(None, LoadAddressCounterHi) + self.interface.assert_command_not_executed(None, LoadAddressCounterLo) def test_no_change_force(self): # Arrange self.display._load_address_counter(80, force_load=False) - self.load_address_counter_hi_mock.reset_mock() - self.load_address_counter_lo_mock.reset_mock() + self.interface.reset_mock() # Act self.display._load_address_counter(80, force_load=True) @@ -438,12 +365,18 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase): # Assert self.assertEqual(self.display.address_counter, 80) - self.load_address_counter_hi_mock.assert_called_with(self.terminal.interface, 0) - self.load_address_counter_lo_mock.assert_called_with(self.terminal.interface, 80) + self.assert_load_address_counter_hi(0) + self.assert_load_address_counter_lo(80) + + def assert_load_address_counter_hi(self, address): + self.interface.assert_command_executed(None, LoadAddressCounterHi, lambda command: command.address == address) + + def assert_load_address_counter_lo(self, address): + self.interface.assert_command_executed(None, LoadAddressCounterLo, lambda command: command.address == address) class StatusLineWriteTestCase(unittest.TestCase): def setUp(self): - self.display = Mock() + self.display = create_autospec(Display, instance=True) self.display.dimensions = Dimensions(24, 80) @@ -462,7 +395,7 @@ class StatusLineWriteTestCase(unittest.TestCase): class BufferedDisplayBufferedWriteByteTestCase(unittest.TestCase): def setUp(self): - self.terminal = Mock() + self.terminal = create_autospec(Terminal, instance=True) dimensions = Dimensions(24, 80) @@ -561,40 +494,12 @@ class BufferedDisplayBufferedWriteByteTestCase(unittest.TestCase): class BufferedDisplayFlushTestCase(unittest.TestCase): def setUp(self): - self.terminal = Mock() + self.interface = MockInterface() - dimensions = Dimensions(24, 80) - - self.buffered_display = BufferedDisplay(self.terminal, dimensions, None) + self.buffered_display = _create_buffered_display(self.interface) self.buffered_display.write = Mock(wraps=self.buffered_display.write) - patcher = patch('oec.display.read_address_counter_hi') - - self.read_address_counter_hi_mock = patcher.start() - - patcher = patch('oec.display.read_address_counter_lo') - - self.read_address_counter_lo_mock = patcher.start() - - patcher = patch('oec.display.load_address_counter_hi') - - self.load_address_counter_hi_mock = patcher.start() - - patcher = patch('oec.display.load_address_counter_lo') - - self.load_address_counter_lo_mock = patcher.start() - - patcher = patch('oec.display.write_data') - - self.write_data_mock = patcher.start() - - patcher = patch('oec.display.eab_write_alternate') - - self.eab_write_alternate_mock = patcher.start() - - self.addCleanup(patch.stopall) - def test_no_changes(self): # Arrange self.assertFalse(self.buffered_display.dirty) @@ -624,9 +529,7 @@ class BufferedDisplayFlushTestCase(unittest.TestCase): def test_single_range_with_eab_feature(self): # Arrange - dimensions = Dimensions(24, 80) - - self.buffered_display = BufferedDisplay(self.terminal, dimensions, 7) + self.buffered_display = _create_buffered_display(self.interface, has_eab=True) self.buffered_display.write = Mock(wraps=self.buffered_display.write) @@ -669,9 +572,7 @@ class BufferedDisplayFlushTestCase(unittest.TestCase): def test_multiple_ranges_with_eab_feature(self): # Arrange - dimensions = Dimensions(24, 80) - - self.buffered_display = BufferedDisplay(self.terminal, dimensions, 7) + self.buffered_display = _create_buffered_display(self.interface, has_eab=True) self.buffered_display.write = Mock(wraps=self.buffered_display.write) @@ -700,41 +601,13 @@ class BufferedDisplayFlushTestCase(unittest.TestCase): class BufferedDisplayClearTestCase(unittest.TestCase): def setUp(self): - self.terminal = Mock() + self.interface = MockInterface() - dimensions = Dimensions(24, 80) - - self.buffered_display = BufferedDisplay(self.terminal, dimensions, None) + self.buffered_display = _create_buffered_display(self.interface) self.buffered_display.write = Mock(wraps=self.buffered_display.write) self.buffered_display._load_address_counter = Mock(wraps=self.buffered_display._load_address_counter) - patcher = patch('oec.display.read_address_counter_hi') - - self.read_address_counter_hi_mock = patcher.start() - - patcher = patch('oec.display.read_address_counter_lo') - - self.read_address_counter_lo_mock = patcher.start() - - patcher = patch('oec.display.load_address_counter_hi') - - self.load_address_counter_hi_mock = patcher.start() - - patcher = patch('oec.display.load_address_counter_lo') - - self.load_address_counter_lo_mock = patcher.start() - - patcher = patch('oec.display.write_data') - - self.write_data_mock = patcher.start() - - patcher = patch('oec.display.eab_write_alternate') - - self.eab_write_alternate_mock = patcher.start() - - self.addCleanup(patch.stopall) - def test_excluding_status_line_with_no_eab_feature(self): # Arrange self.buffered_display.buffered_write_byte(0x01, None, address=80) @@ -753,9 +626,7 @@ class BufferedDisplayClearTestCase(unittest.TestCase): def test_excluding_status_line_with_eab_feature(self): # Arrange - dimensions = Dimensions(24, 80) - - self.buffered_display = BufferedDisplay(self.terminal, dimensions, 7) + self.buffered_display = _create_buffered_display(self.interface, has_eab=True) self.buffered_display.write = Mock(wraps=self.buffered_display.write) self.buffered_display._load_address_counter = Mock(wraps=self.buffered_display._load_address_counter) @@ -793,9 +664,7 @@ class BufferedDisplayClearTestCase(unittest.TestCase): def test_including_status_line_with_eab_feature(self): # Arrange - dimensions = Dimensions(24, 80) - - self.buffered_display = BufferedDisplay(self.terminal, dimensions, 7) + self.buffered_display = _create_buffered_display(self.interface, has_eab=True) self.buffered_display.write = Mock(wraps=self.buffered_display.write) self.buffered_display._load_address_counter = Mock(wraps=self.buffered_display._load_address_counter) @@ -817,47 +686,23 @@ class BufferedDisplayClearTestCase(unittest.TestCase): class BufferedDisplayWriteTestCase(unittest.TestCase): def setUp(self): - self.terminal = Mock() + self.interface = MockInterface() - dimensions = Dimensions(24, 80) - - self.buffered_display = BufferedDisplay(self.terminal, dimensions, None) + self.buffered_display = _create_buffered_display(self.interface) self.buffered_display._read_address_counter = Mock(wraps=self.buffered_display._read_address_counter) self.buffered_display._load_address_counter = Mock(wraps=self.buffered_display._load_address_counter) - - patcher = patch('oec.display.read_address_counter_hi') - - self.read_address_counter_hi_mock = patcher.start() - - patcher = patch('oec.display.read_address_counter_lo') - - self.read_address_counter_lo_mock = patcher.start() - - patcher = patch('oec.display.load_address_counter_hi') - - self.load_address_counter_hi_mock = patcher.start() - - patcher = patch('oec.display.load_address_counter_lo') - - self.load_address_counter_lo_mock = patcher.start() - - patcher = patch('oec.display.write_data') - - self.write_data_mock = patcher.start() - - patcher = patch('oec.display.eab_write_alternate') - - self.eab_write_alternate_mock = patcher.start() - - self.addCleanup(patch.stopall) + self.buffered_display._write_data = Mock(wraps=self.buffered_display._write_data) + self.buffered_display._eab_write_alternate = Mock(wraps=self.buffered_display._eab_write_alternate) def test_if_current_address_unknown(self): # Arrange self.assertIsNone(self.buffered_display.address_counter) - self.read_address_counter_hi_mock.return_value = 0 - self.read_address_counter_lo_mock.return_value = 160 + self.interface.mock_responses = [ + (None, ReadAddressCounterHi, None, 0), + (None, ReadAddressCounterLo, None, 160) + ] # Act self.buffered_display.write(bytes.fromhex('01 02 03'), None) @@ -895,7 +740,7 @@ class BufferedDisplayWriteTestCase(unittest.TestCase): # Assert self.assertEqual(self.buffered_display.regen_buffer[80:83], bytes.fromhex('01 02 03')) - self.write_data_mock.assert_called_with(self.terminal.interface, bytes.fromhex('01 02 03'), jumbo_write_strategy=None) + self.buffered_display._write_data.assert_called_with(bytes.fromhex('01 02 03')) def test_regen_only_repeat(self): # Arrange @@ -907,16 +752,16 @@ class BufferedDisplayWriteTestCase(unittest.TestCase): # Assert self.assertEqual(self.buffered_display.regen_buffer[80:86], bytes.fromhex('01 02 03 01 02 03')) - self.write_data_mock.assert_called_with(self.terminal.interface, (bytes.fromhex('01 02 03'), 2), jumbo_write_strategy=None) + self.buffered_display._write_data.assert_called_with((bytes.fromhex('01 02 03'), 2)) def test_regen_eab(self): # Arrange - dimensions = Dimensions(24, 80) - - self.buffered_display = BufferedDisplay(self.terminal, dimensions, 7) + self.buffered_display = _create_buffered_display(self.interface, has_eab=True) self.buffered_display._read_address_counter = Mock(wraps=self.buffered_display._read_address_counter) self.buffered_display._load_address_counter = Mock(wraps=self.buffered_display._load_address_counter) + self.buffered_display._write_data = Mock(wraps=self.buffered_display._write_data) + self.buffered_display._eab_write_alternate = Mock(wraps=self.buffered_display._eab_write_alternate) self.buffered_display.address_counter = 80 @@ -927,16 +772,16 @@ class BufferedDisplayWriteTestCase(unittest.TestCase): self.assertEqual(self.buffered_display.regen_buffer[80:83], bytes.fromhex('01 02 03')) self.assertEqual(self.buffered_display.eab_buffer[80:83], bytes.fromhex('04 05 06')) - self.eab_write_alternate_mock.assert_called_with(self.terminal.interface, 7, bytes.fromhex('01 04 02 05 03 06'), jumbo_write_strategy=None) + self.buffered_display._eab_write_alternate.assert_called_with(bytes.fromhex('01 04 02 05 03 06')) def test_regen_eab_repeat(self): # Arrange - dimensions = Dimensions(24, 80) - - self.buffered_display = BufferedDisplay(self.terminal, dimensions, 7) + self.buffered_display = _create_buffered_display(self.interface, has_eab=True) self.buffered_display._read_address_counter = Mock(wraps=self.buffered_display._read_address_counter) self.buffered_display._load_address_counter = Mock(wraps=self.buffered_display._load_address_counter) + self.buffered_display._write_data = Mock(wraps=self.buffered_display._write_data) + self.buffered_display._eab_write_alternate = Mock(wraps=self.buffered_display._eab_write_alternate) self.buffered_display.address_counter = 80 @@ -947,7 +792,7 @@ class BufferedDisplayWriteTestCase(unittest.TestCase): self.assertEqual(self.buffered_display.regen_buffer[80:86], bytes.fromhex('01 02 03 01 02 03')) self.assertEqual(self.buffered_display.eab_buffer[80:86], bytes.fromhex('04 05 06 04 05 06')) - self.eab_write_alternate_mock.assert_called_with(self.terminal.interface, 7, (bytes.fromhex('01 04 02 05 03 06'), 2), jumbo_write_strategy=None) + self.buffered_display._eab_write_alternate.assert_called_with((bytes.fromhex('01 04 02 05 03 06'), 2)) def test_dirty_cleared(self): # Arrange @@ -993,3 +838,29 @@ class EncodeStringTestCase(unittest.TestCase): def test_unmapped_characters(self): self.assertEqual(encode_string('Everything ✓'), bytes.fromhex('a4 95 84 91 98 93 87 88 8d 86 00 18')) + +def _create_display(interface): + terminal_id = TerminalId(0b11110100) + extended_id = 'c1348300' + dimensions = Dimensions(24, 80) + features = { } + keymap = KEYMAP_3278_2 + + terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap) + + display = Display(terminal, dimensions, features.get(Feature.EAB)) + + return display + +def _create_buffered_display(interface, has_eab=False): + terminal_id = TerminalId(0b11110100) + extended_id = 'c1348300' + dimensions = Dimensions(24, 80) + features = { Feature.EAB: 7 } if has_eab else { } + keymap = KEYMAP_3278_2 + + terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap) + + buffered_display = BufferedDisplay(terminal, dimensions, features.get(Feature.EAB)) + + return buffered_display diff --git a/tests/test_interface.py b/tests/test_interface.py new file mode 100644 index 0000000..c8b4da1 --- /dev/null +++ b/tests/test_interface.py @@ -0,0 +1,202 @@ +import unittest +from unittest.mock import Mock, patch + +from coax import ReadAddressCounterHi, ReadAddressCounterLo, ProtocolError + +import context + +from oec.interface import InterfaceWrapper, AggregateExecuteError, address_commands + +from mock_interface import MockInterface + +class InterfaceWrapperInitTestCase(unittest.TestCase): + def setUp(self): + self.interface = MockInterface() + + patcher = patch('oec.interface._get_jumbo_write_strategy') + + self.get_jumbo_write_strategy = patcher.start() + + patcher = patch('oec.interface._print_i1_jumbo_write_notice') + + self.print_i1_jumbo_write_notice = patcher.start() + + self.addCleanup(patch.stopall) + + def test_no_jumbo_write_strategy(self): + # Arrange + self.get_jumbo_write_strategy.return_value = None + + # Act + interface_wrapper = InterfaceWrapper(self.interface) + + # Assert + self.assertIsNone(interface_wrapper.jumbo_write_strategy) + self.assertIsNone(interface_wrapper.jumbo_write_max_length) + + self.print_i1_jumbo_write_notice.assert_not_called() + + def test_split_jumbo_write_strategy(self): + # Arrange + self.get_jumbo_write_strategy.return_value = 'split' + + # Act + interface_wrapper = InterfaceWrapper(self.interface) + + # Assert + self.assertEqual(interface_wrapper.jumbo_write_strategy, 'split') + self.assertEqual(interface_wrapper.jumbo_write_max_length, 1024) + + self.print_i1_jumbo_write_notice.assert_not_called() + + def test_i1_no_jumbo_write_strategy(self): + # Arrange + self.interface.legacy_firmware_detected = True + + self.get_jumbo_write_strategy.return_value = None + + # Act + interface_wrapper = InterfaceWrapper(self.interface) + + # Assert + self.assertEqual(interface_wrapper.jumbo_write_strategy, 'split') + self.assertEqual(interface_wrapper.jumbo_write_max_length, 1024) + + self.print_i1_jumbo_write_notice.assert_called() + +class InterfaceWrapperExecuteTestCase(unittest.TestCase): + def setUp(self): + self.interface = MockInterface() + + self.interface_wrapper = InterfaceWrapper(self.interface) + + def test_single_command(self): + # Arrange + self.interface.mock_responses = [(None, ReadAddressCounterHi, None, 0x00)] + + # Act + response = self.interface_wrapper.execute((None, ReadAddressCounterHi())) + + # Assert + self.assertEqual(response, 0x00) + + def test_single_command_that_raises_error(self): + # Arrange + self.interface.mock_responses = [(None, ReadAddressCounterHi, None, Mock(side_effect=ProtocolError))] + + # Act and assert + with self.assertRaises(ProtocolError): + self.interface_wrapper.execute((None, ReadAddressCounterHi())) + + def test_multiple_commands(self): + # Arrange + self.interface.mock_responses = [ + (None, ReadAddressCounterHi, None, 0x00), + (None, ReadAddressCounterLo, None, 0xff) + ] + + # Act + responses = self.interface_wrapper.execute([(None, ReadAddressCounterHi()), (None, ReadAddressCounterLo())]) + + # Assert + self.assertEqual(responses, [0x00, 0xff]) + + def test_multiple_commands_that_returns_error(self): + # Arrange + self.interface.mock_responses = [ + (None, ReadAddressCounterHi, None, 0x00), + (None, ReadAddressCounterLo, None, Mock(side_effect=ProtocolError)) + ] + + # Act and assert + with self.assertRaises(AggregateExecuteError) as context: + self.interface_wrapper.execute([(None, ReadAddressCounterHi()), (None, ReadAddressCounterLo())]) + + error = context.exception + + self.assertEqual(len(error.errors), 1) + self.assertIsInstance(error.errors[0], ProtocolError) + + self.assertEqual(len(error.responses), 2) + +class InterfaceWrapperJumboWriteSplitDataTestCase(unittest.TestCase): + def setUp(self): + self.interface = MockInterface() + + self.interface_wrapper = InterfaceWrapper(self.interface) + + def test_no_split_strategy(self): + # Arrange + self.interface_wrapper.jumbo_write_strategy = None + self.interface_wrapper.jumbo_write_max_length = 32 + + # Act and assert + for data in [bytes(range(0, 64)), (bytes.fromhex('00'), 64)]: + with self.subTest(data=data): + result = self.interface_wrapper.jumbo_write_split_data(data) + + self.assertEqual(len(result), 1) + + self.assertEqual(result[0], data) + + def test_split_strategy_one_chunk(self): + # Arrange + self.interface_wrapper.jumbo_write_strategy = 'split' + self.interface_wrapper.jumbo_write_max_length = 32 + + # Act and assert + for data in [bytes(range(0, 16)), (bytes.fromhex('00'), 16), bytes(range(0, 31)), (bytes.fromhex('00'), 31)]: + with self.subTest(data=data): + result = self.interface_wrapper.jumbo_write_split_data(data) + + self.assertEqual(len(result), 1) + + self.assertEqual(result[0], data) + + def test_split_strategy_two_chunks(self): + # Arrange + self.interface_wrapper.jumbo_write_strategy = 'split' + self.interface_wrapper.jumbo_write_max_length = 32 + + # Act and assert + for data in [bytes(range(0, 32)), (bytes.fromhex('00'), 32), bytes(range(0, 63)), (bytes.fromhex('00'), 63)]: + with self.subTest(data=data): + result = self.interface_wrapper.jumbo_write_split_data(data) + + self.assertEqual(len(result), 2) + self.assertEqual(len(result[0]), 31) + + def test_split_strategy_three_chunks(self): + # Arrange + self.interface_wrapper.jumbo_write_strategy = 'split' + self.interface_wrapper.jumbo_write_max_length = 32 + + # Act and assert + for data in [bytes(range(0, 64)), (bytes.fromhex('00'), 64), bytes(range(0, 95)), (bytes.fromhex('00'), 95)]: + with self.subTest(data=data): + result = self.interface_wrapper.jumbo_write_split_data(data) + + self.assertEqual(len(result), 3) + self.assertEqual(len(result[0]), 31) + self.assertEqual(len(result[1]), 32) + +class AddressCommandsTestCase(unittest.TestCase): + def test_single_command(self): + # Arrange + command = ReadAddressCounterHi() + + # Act + result = address_commands(0b111000, command) + + # Assert + self.assertEqual(result, (0b111000, command)) + + def test_multiple_commands(self): + # Arrange + commands = [ReadAddressCounterHi(), ReadAddressCounterLo()] + + # Act + result = address_commands(0b111000, commands) + + # Assert + self.assertEqual(result, [(0b111000, commands[0]), (0b111000, commands[1])]) diff --git a/tests/test_terminal.py b/tests/test_terminal.py index 7f66850..da1ec72 100644 --- a/tests/test_terminal.py +++ b/tests/test_terminal.py @@ -1,68 +1,49 @@ import unittest -from unittest.mock import Mock, patch -from coax import Feature, PollAction -from coax.protocol import TerminalId, TerminalType +from unittest.mock import Mock, create_autospec +from coax import Poll, PollAction, TerminalType, Feature, ReadTerminalId, ReadExtendedId, ReadFeatureId +from coax.protocol import TerminalId import context +from oec.interface import InterfaceWrapper from oec.terminal import create_terminal, Terminal, UnsupportedTerminalError -from oec.display import Dimensions +from oec.display import Display, Dimensions from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2 +from mock_interface import MockInterface + class TerminalSetupTestCase(unittest.TestCase): def setUp(self): - self.interface = Mock() + self.interface = MockInterface() - terminal_id = TerminalId(0b11110100) - extended_id = 'c1348300' - dimensions = Dimensions(24, 80) - features = { } - keymap = KEYMAP_3278_2 + self.terminal = _create_terminal(self.interface) - self.terminal = Terminal(self.interface, terminal_id, extended_id, dimensions, features, keymap) - - self.terminal.display = Mock() - - patcher = patch('oec.terminal.load_control_register') - - self.load_control_register_mock = patcher.start() - - self.addCleanup(patch.stopall) + self.terminal.display = create_autospec(Display, instance=True) def test(self): self.terminal.setup() class TerminalPollTestCase(unittest.TestCase): def setUp(self): - self.interface = Mock() + self.interface = MockInterface() - terminal_id = TerminalId(0b11110100) - extended_id = 'c1348300' - dimensions = Dimensions(24, 80) - features = { } - keymap = KEYMAP_3278_2 + self.terminal = _create_terminal(self.interface) - self.terminal = Terminal(self.interface, terminal_id, extended_id, dimensions, features, keymap) - - patcher = patch('oec.terminal.poll') - - self.poll_mock = patcher.start() - - self.addCleanup(patch.stopall) + self.terminal.display = create_autospec(Display, instance=True) # The terminal will be initialized in a state where the terminal keyboard clicker # state is unknown, and this cannot be read. Therefore the first POLL will always # attempt to set the keyboard clicker state... self.terminal.poll() - self.poll_mock.reset_mock() + self.interface.reset_mock() def test_with_no_queued_actions(self): # Act self.terminal.poll() # Assert - self.poll_mock.assert_called_with(self.interface, PollAction.NONE) + self.assert_poll_with_poll_action(PollAction.NONE) def test_with_sound_alarm_queued(self): # Arrange @@ -72,7 +53,7 @@ class TerminalPollTestCase(unittest.TestCase): self.terminal.poll() # Assert - self.poll_mock.assert_called_with(self.interface, PollAction.ALARM) + self.assert_poll_with_poll_action(PollAction.ALARM) def test_with_enable_keyboard_clicker_queued(self): # Arrange @@ -84,38 +65,29 @@ class TerminalPollTestCase(unittest.TestCase): self.terminal.poll() # Assert - self.poll_mock.assert_called_with(self.interface, PollAction.ENABLE_KEYBOARD_CLICKER) + self.assert_poll_with_poll_action(PollAction.ENABLE_KEYBOARD_CLICKER) + + def assert_poll_with_poll_action(self, action): + self.interface.assert_command_executed(None, Poll, lambda command: command.action == action) class CreateTerminalTestCase(unittest.TestCase): def setUp(self): - self.interface = Mock() - - self.interface.legacy_firmware_detected = False + self.interface = MockInterface() self.get_keymap = lambda terminal_id, extended_id: KEYMAP_3278_2 - patcher = patch('oec.terminal.read_terminal_id') - - self.read_terminal_id_mock = patcher.start() - - patcher = patch('oec.terminal.read_extended_id') - - self.read_extended_id_mock = patcher.start() - - patcher = patch('oec.terminal.get_features') - - self.get_features_mock = patcher.start() - - self.addCleanup(patch.stopall) - def test_supported_terminal(self): # Arrange - self.read_terminal_id_mock.return_value = TerminalId(0b11110100) - self.read_extended_id_mock.return_value = bytes.fromhex('c1 34 83 00') - self.get_features_mock.return_value = { Feature.EAB: 7 } + interface = InterfaceWrapper(self.interface) + + self.interface.mock_responses = [ + (None, ReadTerminalId, None, TerminalId(0b11110100)), + (None, ReadExtendedId, None, bytes.fromhex('c1 34 83 00')), + (None, ReadFeatureId, lambda command: command.feature_address == 7, Feature.EAB.value) + ] # Act - terminal = create_terminal(self.interface, None, self.get_keymap) + terminal = create_terminal(interface, None, None, self.get_keymap) # Assert self.assertEqual(terminal.terminal_id.type, TerminalType.CUT) @@ -128,46 +100,34 @@ class CreateTerminalTestCase(unittest.TestCase): def test_unsupported_terminal_type(self): # Arrange - self.read_terminal_id_mock.return_value = TerminalId(0b00000001) + interface = InterfaceWrapper(self.interface) + + self.interface.mock_responses = [(None, ReadTerminalId, None, TerminalId(0b00000001))] # Act and assert with self.assertRaises(UnsupportedTerminalError): - create_terminal(self.interface, None, self.get_keymap) + create_terminal(interface, None, None, self.get_keymap) def test_unsupported_terminal_model(self): # Arrange + interface = InterfaceWrapper(self.interface) terminal_id = TerminalId(0b11110100) terminal_id.model = 1 - self.read_terminal_id_mock.return_value = terminal_id + self.interface.mock_responses = [(None, ReadTerminalId, None, terminal_id)] # Act and assert with self.assertRaises(UnsupportedTerminalError): - create_terminal(self.interface, None, self.get_keymap) + create_terminal(interface, None, None, self.get_keymap) - def test_eab_feature_removed_on_legacy_interface_without_strategy(self): - # Arrange - self.interface.legacy_firmware_detected = True +def _create_terminal(interface): + terminal_id = TerminalId(0b11110100) + extended_id = 'c1348300' + dimensions = Dimensions(24, 80) + features = { } + keymap = KEYMAP_3278_2 - self.read_terminal_id_mock.return_value = TerminalId(0b11110100) - self.read_extended_id_mock.return_value = bytes.fromhex('c1 34 83 00') - self.get_features_mock.return_value = { Feature.EAB: 7 } + terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap) - patcher = patch('oec.terminal._print_no_i1_eab_notice') - - print_no_i1_eab_notice_mock = patcher.start() - - # Act - terminal = create_terminal(self.interface, None, self.get_keymap) - - # Assert - self.assertEqual(terminal.terminal_id.type, TerminalType.CUT) - self.assertEqual(terminal.terminal_id.model, 2) - self.assertEqual(terminal.terminal_id.keyboard, 15) - self.assertEqual(terminal.extended_id, 'c1348300') - self.assertEqual(terminal.display.dimensions, Dimensions(24, 80)) - self.assertEqual(terminal.features, { }) - self.assertEqual(terminal.keyboard.keymap.name, '3278-2') - - print_no_i1_eab_notice_mock.assert_called_once() + return terminal diff --git a/tests/test_tn3270.py b/tests/test_tn3270.py index c34049f..40a90d0 100644 --- a/tests/test_tn3270.py +++ b/tests/test_tn3270.py @@ -1,26 +1,35 @@ import unittest -from unittest.mock import Mock, patch +from unittest.mock import Mock, create_autospec -import context - -from oec.session import SessionDisconnectedError -from oec.display import Dimensions, BufferedDisplay -from oec.keyboard import Key, KeyboardModifiers -from oec.tn3270 import TN3270Session -from tn3270 import AttributeCell, CharacterCell, AID, Color, ProtectedCellOperatorError, FieldOverflowOperatorError +from coax.protocol import TerminalId +from tn3270 import Telnet, Emulator, AttributeCell, CharacterCell, AID, Color, ProtectedCellOperatorError, FieldOverflowOperatorError from tn3270.attributes import Attribute from tn3270.emulator import CellFormatting +import context + +from oec.interface import InterfaceWrapper +from oec.terminal import Terminal +from oec.display import Dimensions, BufferedDisplay, StatusLine +from oec.keyboard import Key, KeyboardModifiers +from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2 +from oec.session import SessionDisconnectedError +from oec.tn3270 import TN3270Session + +from mock_interface import MockInterface + class SessionHandleHostTestCase(unittest.TestCase): def setUp(self): - self.terminal = Mock() + self.interface = MockInterface() + + self.terminal = _create_terminal(self.interface) self.session = TN3270Session(self.terminal, 'mainframe', 23) - self.telnet = Mock() + self.telnet = create_autospec(Telnet, instance=True) self.session.telnet = self.telnet - self.session.emulator = Mock() + self.session.emulator = create_autospec(Emulator, instance=True) def test_no_changes(self): # Arrange @@ -58,11 +67,13 @@ class SessionHandleHostTestCase(unittest.TestCase): class SessionHandleKeyTestCase(unittest.TestCase): def setUp(self): - self.terminal = Mock() + self.interface = MockInterface() + + self.terminal = _create_terminal(self.interface) self.session = TN3270Session(self.terminal, 'mainframe', 23) - self.session.emulator = Mock() + self.session.emulator = create_autospec(Emulator, instance=True) self.session.emulator.cells = [] self.session.emulator.dirty = set() @@ -206,9 +217,9 @@ class SessionHandleKeyTestCase(unittest.TestCase): class SessionRenderTestCase(unittest.TestCase): def setUp(self): - self.terminal = Mock() + self.interface = MockInterface() - self.terminal.display = BufferedDisplay(self.terminal, Dimensions(24, 80), None) + self.terminal = _create_terminal(self.interface) self.terminal.display.buffered_write_byte = Mock(wraps=self.terminal.display.buffered_write_byte) self.terminal.display.move_cursor = Mock(wraps=self.terminal.display.move_cursor) @@ -217,36 +228,10 @@ class SessionRenderTestCase(unittest.TestCase): self.session = TN3270Session(self.terminal, 'mainframe', 23) - self.telnet = Mock() + self.session.telnet = create_autospec(Telnet, instance=True) + self.session.emulator = create_autospec(Emulator, instance=True) - self.session.telnet = self.telnet - self.session.emulator = Mock() - - patcher = patch('oec.display.read_address_counter_hi') - - self.read_address_counter_hi_mock = patcher.start() - - patcher = patch('oec.display.read_address_counter_lo') - - self.read_address_counter_lo_mock = patcher.start() - - patcher = patch('oec.display.load_address_counter_hi') - - self.load_address_counter_hi_mock = patcher.start() - - patcher = patch('oec.display.load_address_counter_lo') - - self.load_address_counter_lo_mock = patcher.start() - - patcher = patch('oec.display.write_data') - - self.write_data_mock = patcher.start() - - patcher = patch('oec.display.eab_write_alternate') - - self.eab_write_alternate_mock = patcher.start() - - self.addCleanup(patch.stopall) + self.session.emulator.keyboard_locked = False def test_with_no_eab_feature(self): # Arrange @@ -382,6 +367,19 @@ class SessionRenderTestCase(unittest.TestCase): # Assert self.terminal.display.status_line.write.assert_called_with(8, bytes.fromhex('f600db080000000000')) +def _create_terminal(interface): + terminal_id = TerminalId(0b11110100) + extended_id = 'c1348300' + dimensions = Dimensions(24, 80) + features = { } + keymap = KEYMAP_3278_2 + + terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap) + + terminal.display.status_line = create_autospec(StatusLine, instance=True) + + return terminal + def _create_screen_cells(rows, columns): return [CharacterCell(0x00) for address in range(rows * columns)] diff --git a/tests/test_vt100.py b/tests/test_vt100.py index ed2dd67..2ee70e0 100644 --- a/tests/test_vt100.py +++ b/tests/test_vt100.py @@ -1,22 +1,33 @@ import unittest -from unittest.mock import Mock, patch +from unittest.mock import Mock, create_autospec + +from logging import Logger +from ptyprocess import PtyProcess +from coax.protocol import TerminalId import context -from oec.session import SessionDisconnectedError +from oec.interface import InterfaceWrapper +from oec.terminal import Terminal from oec.display import Dimensions, BufferedDisplay from oec.keyboard import Key, KeyboardModifiers +from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2 +from oec.session import SessionDisconnectedError from oec.vt100 import VT100Session +from mock_interface import MockInterface + class SessionHandleHostTestCase(unittest.TestCase): def setUp(self): - self.terminal = Mock() + self.interface = MockInterface() - self.terminal.display.dimensions = Dimensions(24, 80) + self.terminal = _create_terminal(self.interface) + + self.terminal.sound_alarm = Mock(wraps=self.terminal.sound_alarm) self.session = VT100Session(self.terminal, None) - self.session.host_process = Mock() + self.session.host_process = create_autospec(PtyProcess, instance=True) def test(self): # Arrange @@ -54,13 +65,13 @@ class SessionHandleHostTestCase(unittest.TestCase): class SessionHandleKeyTestCase(unittest.TestCase): def setUp(self): - self.terminal = Mock() + self.interface = MockInterface() - self.terminal.display.dimensions = Dimensions(24, 80) + self.terminal = _create_terminal(self.interface) self.session = VT100Session(self.terminal, None) - self.session.host_process = Mock() + self.session.host_process = create_autospec(PtyProcess, instance=True) def test_printable(self): # Act @@ -87,7 +98,7 @@ class SessionHandleKeyTestCase(unittest.TestCase): def test_unmapped_alt_modifier(self): # Arrange - self.session.logger = Mock() + self.session.logger = create_autospec(Logger, instance=True) # Act self.session.handle_key(Key.THREE, KeyboardModifiers.LEFT_ALT, None) @@ -106,9 +117,9 @@ class SessionHandleKeyTestCase(unittest.TestCase): class SessionRenderTestCase(unittest.TestCase): def setUp(self): - self.terminal = Mock() + self.interface = MockInterface() - self.terminal.display = BufferedDisplay(self.terminal, Dimensions(24, 80), None) + self.terminal = _create_terminal(self.interface) self.terminal.display.buffered_write_byte = Mock(wraps=self.terminal.display.buffered_write_byte) self.terminal.display.move_cursor = Mock(wraps=self.terminal.display.move_cursor) @@ -116,33 +127,7 @@ class SessionRenderTestCase(unittest.TestCase): self.session = VT100Session(self.terminal, None) - self.session.host_process = Mock() - - patcher = patch('oec.display.read_address_counter_hi') - - self.read_address_counter_hi_mock = patcher.start() - - patcher = patch('oec.display.read_address_counter_lo') - - self.read_address_counter_lo_mock = patcher.start() - - patcher = patch('oec.display.load_address_counter_hi') - - self.load_address_counter_hi_mock = patcher.start() - - patcher = patch('oec.display.load_address_counter_lo') - - self.load_address_counter_lo_mock = patcher.start() - - patcher = patch('oec.display.write_data') - - self.write_data_mock = patcher.start() - - patcher = patch('oec.display.eab_write_alternate') - - self.eab_write_alternate_mock = patcher.start() - - self.addCleanup(patch.stopall) + self.session.host_process = create_autospec(PtyProcess, instance=True) def test_with_no_eab_feature(self): # Arrange @@ -189,3 +174,14 @@ class SessionRenderTestCase(unittest.TestCase): self.terminal.display.move_cursor.assert_called_with(row=0, column=3) self.assertFalse(self.session.vt100_screen.dirty) + +def _create_terminal(interface): + terminal_id = TerminalId(0b11110100) + extended_id = 'c1348300' + dimensions = Dimensions(24, 80) + features = { } + keymap = KEYMAP_3278_2 + + terminal = Terminal(InterfaceWrapper(interface), None, terminal_id, extended_id, dimensions, features, keymap) + + return terminal