diff --git a/oec/controller.py b/oec/controller.py index 60d091c..7b2ac5b 100644 --- a/oec/controller.py +++ b/oec/controller.py @@ -3,16 +3,13 @@ oec.controller ~~~~~~~~~~~~~~ """ -import os import time import logging import selectors -from textwrap import dedent -from coax import poll, poll_ack, load_control_register, get_features, PollAction, \ - KeystrokePollResponse, TerminalType, Feature, ReceiveTimeout, \ +from coax import poll, poll_ack, KeystrokePollResponse, ReceiveTimeout, \ ReceiveError, ProtocolError -from .terminal import Terminal, UnsupportedTerminalError, read_terminal_ids +from .terminal import create_terminal, UnsupportedTerminalError from .keyboard import Key from .session import SessionDisconnectedError @@ -106,44 +103,9 @@ class Controller: def _handle_terminal_attached(self, poll_response): self.logger.info('Terminal attached') - jumbo_write_strategy = _get_jumbo_write_strategy() + self.terminal = create_terminal(self.interface, poll_response, self.get_keymap) - # Read the terminal identifiers. - (terminal_id, extended_id) = read_terminal_ids(self.interface) - - self.logger.info(f'Terminal ID = {terminal_id}, Extended ID = {extended_id}') - - if terminal_id.type != TerminalType.CUT: - raise UnsupportedTerminalError('Only CUT type terminals are supported') - - # Get the terminal features. - features = get_features(self.interface) - - self.logger.info(f'Features = {features}') - - if Feature.EAB in features: - if self.interface.legacy_firmware_detected and jumbo_write_strategy is None: - del features[Feature.EAB] - - _print_no_i1_eab_notice() - - # Get the keymap. - keymap = self.get_keymap(terminal_id, extended_id) - - # Initialize the terminal. - self.terminal = Terminal(self.interface, terminal_id, extended_id, - features, keymap, - jumbo_write_strategy=jumbo_write_strategy) - - (rows, columns) = self.terminal.display.dimensions - keymap_name = self.terminal.keyboard.keymap.name - - self.logger.info(f'Rows = {rows}, Columns = {columns}, Keymap = {keymap_name}') - - if self.terminal.display.has_eab: - self.terminal.display.load_eab_mask(0xff) - - self.terminal.display.clear(clear_status_line=True) + self.terminal.setup() # Show the attached indicator on the status line. self.terminal.display.status_line.write_string(0, 'S') @@ -221,12 +183,8 @@ class Controller: if key == Key.CURSOR_BLINK: self.terminal.display.toggle_cursor_blink() - - self._load_control_register() elif key == Key.ALT_CURSOR: self.terminal.display.toggle_cursor_reverse() - - self._load_control_register() elif key == Key.CLICKER: self.terminal.keyboard.toggle_clicker() elif self.session: @@ -235,9 +193,12 @@ class Controller: def _poll(self): self.last_poll_time = time.perf_counter() - poll_action = self.terminal.get_poll_action() if self.terminal else PollAction.NONE - - poll_response = poll(self.interface, poll_action, receive_timeout=1) + # If a terminal is connected, use the terminal method to ensure that + # any queued POLL action is applied. + if self.terminal: + poll_response = self.terminal.poll(receive_timeout=1) + else: + poll_response = poll(self.interface, receive_timeout=1) if poll_response: try: @@ -264,48 +225,3 @@ class Controller: period = self.disconnected_poll_period return max((self.last_poll_time + period) - current_time, 0) - - def _load_control_register(self): - load_control_register(self.interface, self.terminal.get_control_register()) - -def _get_jumbo_write_strategy(): - value = os.environ.get('COAX_JUMBO') - - if value is None: - return None - - if value in ['split', 'ignore']: - return value - - self.logger.warning(f'Unsupported COAX_JUMBO option: {value}') - - return None - -def _print_no_i1_eab_notice(): - notice = ''' - **** **** **** **** **** **** **** **** **** **** **** **** **** **** **** **** - - Your terminal is reporting the existence of an EAB feature that allows extended - colors and formatting, however... - - I think you are using an older firmware on the 1st generation, Arduino Mega - based, interface which does not support the "jumbo write" required to write a - full screen to the regen and EAB buffers. - - I'm going to continue as if the EAB feature did not exist... - - If you want to override this behavior, you can set the COAX_JUMBO environment - variable as follows: - - - COAX_JUMBO=split - split large writes into multiple smaller 32-byte writes - before sending to the interface, this will result in - additional round trips to the interface which may - manifest as visible incremental changes being applied - to the screen - - COAX_JUMBO=ignore - try a jumbo write, anyway, use this option if you - believe you are seeing this behavior in error - - **** **** **** **** **** **** **** **** **** **** **** **** **** **** **** **** - ''' - - print(dedent(notice)) diff --git a/oec/display.py b/oec/display.py index 2eb47ed..174f8e6 100644 --- a/oec/display.py +++ b/oec/display.py @@ -170,10 +170,10 @@ def encode_string(string, errors='replace'): Dimensions = namedtuple('Dimensions', ['rows', 'columns']) class Display: - def __init__(self, interface, dimensions, eab_address, jumbo_write_strategy=None): + def __init__(self, terminal, dimensions, eab_address, jumbo_write_strategy=None): self.logger = logging.getLogger(__name__) - self.interface = interface + self.terminal = terminal self.dimensions = dimensions self.eab_address = eab_address @@ -187,9 +187,6 @@ class Display: self.status_line = StatusLine(self) - self.cursor_reverse = False - self.cursor_blink = False - self.jumbo_write_strategy = jumbo_write_strategy @property @@ -209,7 +206,7 @@ class Display: if not self.has_eab: raise RuntimeError('No EAB feature') - eab_load_mask(self.interface, self.eab_address, mask) + eab_load_mask(self.terminal.interface, self.eab_address, mask) def buffered_write(self, regen_byte, eab_byte, index=None, row=None, column=None): if index is None: @@ -257,10 +254,14 @@ class Display: self.move_cursor(row=0, column=0, force_load=True) def toggle_cursor_blink(self): - self.cursor_blink = not self.cursor_blink + self.terminal.control.cursor_blink = not self.terminal.control.cursor_blink + + self.terminal.load_control_register() def toggle_cursor_reverse(self): - self.cursor_reverse = not self.cursor_reverse + self.terminal.control.cursor_reverse = not self.terminal.control.cursor_reverse + + self.terminal.load_control_register() def _get_index(self, row, column): return (row * self.dimensions.columns) + column @@ -289,8 +290,8 @@ class Display: return address def _read_address_counter(self): - hi = read_address_counter_hi(self.interface) - lo = read_address_counter_lo(self.interface) + hi = read_address_counter_hi(self.terminal.interface) + lo = read_address_counter_lo(self.terminal.interface) return (hi << 8) | lo @@ -302,10 +303,10 @@ class Display: (current_hi, current_lo) = _split_address(self.address_counter) if hi != current_hi or force_load: - load_address_counter_hi(self.interface, hi) + load_address_counter_hi(self.terminal.interface, hi) if lo != current_lo or force_load: - load_address_counter_lo(self.interface, lo) + load_address_counter_lo(self.terminal.interface, lo) self.address_counter = address @@ -371,10 +372,10 @@ class Display: else: data = bytes(interleave(regen_data, eab_data)) - eab_write_alternate(self.interface, self.eab_address, data, + eab_write_alternate(self.terminal.interface, self.eab_address, data, jumbo_write_strategy=self.jumbo_write_strategy) else: - write_data(self.interface, regen_data, + write_data(self.terminal.interface, regen_data, jumbo_write_strategy=self.jumbo_write_strategy) if isinstance(regen_data, tuple): diff --git a/oec/terminal.py b/oec/terminal.py index 30048f0..2e1b2b5 100644 --- a/oec/terminal.py +++ b/oec/terminal.py @@ -3,12 +3,15 @@ oec.terminal ~~~~~~~~~~~~ """ +import os import time import logging -from coax import read_terminal_id, read_extended_id, PollAction, Control, Feature, \ +from textwrap import dedent +from coax import poll, read_terminal_id, read_extended_id, get_features, \ + load_control_register, TerminalType, Feature, PollAction, Control, \ ReceiveError, ProtocolError -from .display import Dimensions, Display +from .display import Dimensions, Display from .keyboard import Keyboard logger = logging.getLogger(__name__) @@ -20,14 +23,115 @@ MODEL_DIMENSIONS = { 5: Dimensions(27, 132) } -def get_dimensions(terminal_id, extended_id): - """Get terminal display dimensions.""" - if not terminal_id.model in MODEL_DIMENSIONS: - raise ValueError(f'Model {terminal_id.model} is not supported') +class Terminal: + """The terminal.""" - return MODEL_DIMENSIONS[terminal_id.model] + def __init__(self, interface, terminal_id, extended_id, dimensions, features, + keymap, jumbo_write_strategy=None): + self.interface = interface + self.terminal_id = terminal_id + self.extended_id = extended_id + self.features = features -def read_terminal_ids(interface, extended_id_retry_attempts=3): + self.control = Control(step_inhibit=False, display_inhibit=False, + cursor_inhibit=False, cursor_reverse=False, + cursor_blink=False) + + self.display = Display(self, dimensions, features.get(Feature.EAB), + jumbo_write_strategy=jumbo_write_strategy) + self.keyboard = Keyboard(keymap) + + self.alarm = False + self.last_poll_keyboard_clicker = None + + def setup(self): + """Load registers and clear the display.""" + self.load_control_register() + + if self.display.has_eab: + self.display.load_eab_mask(0xff) + + self.display.clear(clear_status_line=True) + + def poll(self, **kwargs): + """Execute a POLL command with queued actions.""" + poll_action = PollAction.NONE + + # Convert a queued alarm or keyboard clicker change to POLL action. + if self.alarm: + poll_action = PollAction.ALARM + elif self.keyboard.clicker != self.last_poll_keyboard_clicker: + if self.keyboard.clicker: + poll_action = PollAction.ENABLE_KEYBOARD_CLICKER + else: + poll_action = PollAction.DISABLE_KEYBOARD_CLICKER + + poll_response = poll(self.interface, poll_action, **kwargs) + + # Clear the queued alarm and keyboard clicker change if the POLL was + # successful. + if poll_action == PollAction.ALARM: + self.alarm = False + elif poll_action in [PollAction.ENABLE_KEYBOARD_CLICKER, + PollAction.DISABLE_KEYBOARD_CLICKER]: + self.last_poll_keyboard_clicker = self.keyboard.clicker + + return poll_response + + def sound_alarm(self): + """Queue an alarm on next POLL command.""" + self.alarm = True + + def load_control_register(self): + """Execute a LOAD_CONTROL_REGISTER command.""" + load_control_register(self.interface, self.control) + +class UnsupportedTerminalError(Exception): + """Unsupported terminal.""" + +def create_terminal(interface, poll_response, get_keymap): + """Terminal factory.""" + jumbo_write_strategy = _get_jumbo_write_strategy() + + # Read the terminal identifiers. + (terminal_id, extended_id) = _read_terminal_ids(interface) + + logger.info(f'Terminal ID = {terminal_id}, Extended ID = {extended_id}') + + if terminal_id.type != TerminalType.CUT: + raise UnsupportedTerminalError('Only CUT type terminals are supported') + + # Get the terminal dimensions. + dimensions = MODEL_DIMENSIONS.get(terminal_id.model) + + if dimensions is None: + raise UnsupportedTerminalError(f'Model {terminal_id.model} is not supported') + + logger.info(f'Rows = {dimensions.rows}, Columns = {dimensions.columns}') + + # Get the terminal features. + features = get_features(interface) + + logger.info(f'Features = {features}') + + if Feature.EAB in features: + if interface.legacy_firmware_detected and jumbo_write_strategy is None: + del features[Feature.EAB] + + _print_no_i1_eab_notice() + + # Get the keymap. + keymap = get_keymap(terminal_id, extended_id) + + logger.info(f'Keymap = {keymap.name}') + + # Create the terminal. + terminal = Terminal(interface, terminal_id, extended_id, dimensions, features, + keymap, jumbo_write_strategy=jumbo_write_strategy) + + return terminal + +def _read_terminal_ids(interface, extended_id_retry_attempts=3): terminal_id = None extended_id = None @@ -56,46 +160,44 @@ def read_terminal_ids(interface, extended_id_retry_attempts=3): return (terminal_id, extended_id.hex() if extended_id is not None else None) -class Terminal: - """Terminal information and devices.""" +def _get_jumbo_write_strategy(): + value = os.environ.get('COAX_JUMBO') - def __init__(self, interface, terminal_id, extended_id, features, keymap, - jumbo_write_strategy=None): - self.interface = interface - self.terminal_id = terminal_id - self.extended_id = extended_id - self.features = features + if value is None: + return None - dimensions = get_dimensions(self.terminal_id, self.extended_id) + if value in ['split', 'ignore']: + return value - self.display = Display(interface, dimensions, features.get(Feature.EAB), - jumbo_write_strategy=jumbo_write_strategy) - self.keyboard = Keyboard(keymap) + logger.warning(f'Unsupported COAX_JUMBO option: {value}') - self.alarm = False - self.last_poll_keyboard_clicker = None + return None - def sound_alarm(self): - self.alarm = True +def _print_no_i1_eab_notice(): + notice = ''' + **** **** **** **** **** **** **** **** **** **** **** **** **** **** **** **** - def get_poll_action(self): - if self.alarm: - self.alarm = False + Your terminal is reporting the existence of an EAB feature that allows extended + colors and formatting, however... - return PollAction.ALARM + I think you are using an older firmware on the 1st generation, Arduino Mega + based, interface which does not support the "jumbo write" required to write a + full screen to the regen and EAB buffers. - if self.keyboard.clicker != self.last_poll_keyboard_clicker: - self.last_poll_keyboard_clicker = self.keyboard.clicker + I'm going to continue as if the EAB feature did not exist... - return PollAction.ENABLE_KEYBOARD_CLICKER if self.keyboard.clicker else PollAction.DISABLE_KEYBOARD_CLICKER + If you want to override this behavior, you can set the COAX_JUMBO environment + variable as follows: - return PollAction.NONE + - COAX_JUMBO=split - split large writes into multiple smaller 32-byte writes + before sending to the interface, this will result in + additional round trips to the interface which may + manifest as visible incremental changes being applied + to the screen + - COAX_JUMBO=ignore - try a jumbo write, anyway, use this option if you + believe you are seeing this behavior in error - def get_control_register(self): - control = Control(cursor_reverse=self.display.cursor_reverse, - cursor_blink=self.display.cursor_blink) + **** **** **** **** **** **** **** **** **** **** **** **** **** **** **** **** + ''' - return control - -class UnsupportedTerminalError(Exception): - """Unsupported terminal.""" + print(dedent(notice)) diff --git a/tests/test_controller.py b/tests/test_controller.py index 62fdf2d..db40283 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -1,6 +1,6 @@ import selectors import unittest -from unittest.mock import Mock, PropertyMock, patch +from unittest.mock import Mock, patch from coax import PollAction, PowerOnResetCompletePollResponse, KeystrokePollResponse, ReceiveTimeout from coax.protocol import TerminalId @@ -8,12 +8,11 @@ import context from oec.controller import Controller from oec.session import SessionDisconnectedError +from oec.terminal import Terminal, UnsupportedTerminalError +from oec.display import Dimensions from oec.keyboard import KeyboardModifiers, Key from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2 -CUT_TERMINAL_IDS = (TerminalId(0b11110100), 'c1348300') -DFT_TERMINAL_IDS = (TerminalId(0b00000001), None) - class RunLoopTestCase(unittest.TestCase): def setUp(self): self.interface = Mock() @@ -31,30 +30,30 @@ class RunLoopTestCase(unittest.TestCase): self.controller._update_session = Mock() - patcher = patch('oec.controller.poll') + self.terminal = Terminal(self.interface, TerminalId(0b11110100), 'c1348300', Dimensions(24, 80), { }, KEYMAP_3278_2, None) - self.poll_mock = patcher.start() + self.terminal.setup = Mock() + + self.terminal.display.toggle_cursor_blink = Mock() + self.terminal.display.toggle_cursor_reverse = Mock() + self.terminal.display._write = Mock() + + self.terminal.keyboard.toggle_clicker = Mock() + + self.poll_mock = Mock() + + patcher = patch('oec.controller.poll', self.poll_mock) + + patcher.start() + + patcher = patch('oec.terminal.poll', self.poll_mock) + + patcher.start() patcher = patch('oec.controller.poll_ack') self.poll_ack_mock = patcher.start() - patcher = patch('oec.controller.read_terminal_ids') - - self.read_terminal_ids_mock = patcher.start() - - self.read_terminal_ids_mock.return_value = CUT_TERMINAL_IDS - - patcher = patch('oec.controller.load_control_register') - - self.load_control_register_mock = patcher.start() - - patcher = patch('oec.controller.get_features') - - self.get_features_mock = patcher.start() - - self.get_features_mock.return_value = { } - patcher = patch('oec.controller.time.perf_counter') self.perf_counter_mock = patcher.start() @@ -63,17 +62,11 @@ class RunLoopTestCase(unittest.TestCase): self.sleep_mock = patcher.start() - patcher = patch('oec.display.load_address_counter_hi') + patcher = patch('oec.controller.create_terminal') - patcher.start() + self.create_terminal_mock = patcher.start() - patcher = patch('oec.display.load_address_counter_lo') - - patcher.start() - - patcher = patch('oec.display.write_data') - - patcher.start() + self.create_terminal_mock.return_value = self.terminal self.addCleanup(patch.stopall) @@ -95,7 +88,7 @@ class RunLoopTestCase(unittest.TestCase): self.controller._update_session.assert_called() def test_unsupported_terminal_attached(self): - self.read_terminal_ids_mock.return_value = DFT_TERMINAL_IDS + self.create_terminal_mock.side_effect = [UnsupportedTerminalError] self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) @@ -136,91 +129,50 @@ class RunLoopTestCase(unittest.TestCase): self.assertEqual(self.create_session_mock.call_count, 2) - def test_alarm(self): - # Arrange - self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) - self._assert_run_loop(0, None, False, 0, False) - - self.assertIsNotNone(self.controller.terminal) - - # Act - self.controller.terminal.sound_alarm() - - # Assert - self._assert_run_loop(0.5, None, True, 0.5, False) - - self.assertEqual(self.poll_mock.call_args[0][1], PollAction.ALARM) - - self.assertFalse(self.controller.terminal.alarm) - def test_toggle_cursor_blink(self): self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) - self.assertFalse(self.controller.terminal.display.cursor_blink) + self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True) + + self.terminal.display.toggle_cursor_blink.assert_called_once() + + self.terminal.display.toggle_cursor_blink.reset_mock() self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True) - self.assertTrue(self.controller.terminal.display.cursor_blink) - - self.load_control_register_mock.assert_called() - - self.assertTrue(self.load_control_register_mock.call_args[0][1].cursor_blink) - - self.load_control_register_mock.reset_mock() - - self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True) - - self.assertFalse(self.controller.terminal.display.cursor_blink) - - self.load_control_register_mock.assert_called() - - self.assertFalse(self.load_control_register_mock.call_args[0][1].cursor_blink) + self.terminal.display.toggle_cursor_blink.assert_called_once() def test_toggle_cursor_reverse(self): self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) - self.assertFalse(self.controller.terminal.display.cursor_reverse) + self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True) + self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True) + self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True) + + self.terminal.display.toggle_cursor_reverse.assert_called_once() + + self.terminal.display.toggle_cursor_reverse.reset_mock() self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True) self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True) self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True) - self.assertTrue(self.controller.terminal.display.cursor_reverse) - - self.load_control_register_mock.assert_called() - - self.assertTrue(self.load_control_register_mock.call_args[0][1].cursor_reverse) - - self.load_control_register_mock.reset_mock() - - self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True) - self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True) - self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True) - - self.assertFalse(self.controller.terminal.display.cursor_reverse) - - self.load_control_register_mock.assert_called() - - self.assertFalse(self.load_control_register_mock.call_args[0][1].cursor_reverse) + self.terminal.display.toggle_cursor_reverse.assert_called_once() def test_toggle_clicker(self): self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) - self.assertFalse(self.controller.terminal.keyboard.clicker) - self._assert_run_loop(0, KeystrokePollResponse(0b0101011110), False, 0, True) self._assert_run_loop(0, None, False, 0, False) - self.assertTrue(self.controller.terminal.keyboard.clicker) + self.terminal.keyboard.toggle_clicker.assert_called_once() - self.assertEqual(self.poll_mock.call_args[0][1], PollAction.ENABLE_KEYBOARD_CLICKER) + self.terminal.keyboard.toggle_clicker.reset_mock() self._assert_run_loop(0.5, KeystrokePollResponse(0b0101011110), True, 0.5, True) self._assert_run_loop(1, None, False, 0, False) - self.assertFalse(self.controller.terminal.keyboard.clicker) - - self.assertEqual(self.poll_mock.call_args[0][1], PollAction.DISABLE_KEYBOARD_CLICKER) + self.terminal.keyboard.toggle_clicker.assert_called_once() def _assert_run_loop(self, poll_time, poll_response, expected_update_session, expected_poll_delay, expected_poll_ack): # Arrange diff --git a/tests/test_display.py b/tests/test_display.py index d97554a..904dcbb 100644 --- a/tests/test_display.py +++ b/tests/test_display.py @@ -7,11 +7,11 @@ from oec.display import Dimensions, Display, encode_ascii_character, encode_ebcd class DisplayMoveCursorTestCase(unittest.TestCase): def setUp(self): - self.interface = Mock() + self.terminal = Mock() dimensions = Dimensions(24, 80) - self.display = Display(self.interface, dimensions, None) + self.display = Display(self.terminal, dimensions, None) self.display._load_address_counter = Mock(wraps=self.display._load_address_counter) @@ -54,11 +54,11 @@ class DisplayMoveCursorTestCase(unittest.TestCase): class DisplayBufferedWriteTestCase(unittest.TestCase): def setUp(self): - self.interface = Mock() + self.terminal = Mock() dimensions = Dimensions(24, 80) - self.display = Display(self.interface, Dimensions(24, 80), None) + self.display = Display(self.terminal, Dimensions(24, 80), None) def test_with_no_eab(self): # Act @@ -113,11 +113,11 @@ class DisplayBufferedWriteTestCase(unittest.TestCase): class DisplayFlushTestCase(unittest.TestCase): def setUp(self): - self.interface = Mock() + self.terminal = Mock() dimensions = Dimensions(24, 80) - self.display = Display(self.interface, dimensions, None) + self.display = Display(self.terminal, dimensions, None) self.display._flush_range = Mock() @@ -196,11 +196,11 @@ class DisplayFlushTestCase(unittest.TestCase): class DisplayClearTestCase(unittest.TestCase): def setUp(self): - self.interface = Mock() + self.terminal = Mock() dimensions = Dimensions(24, 80) - self.display = Display(self.interface, dimensions, None) + self.display = Display(self.terminal, dimensions, None) self.display._load_address_counter = Mock(wraps=self.display._load_address_counter) self.display._write = Mock(wraps=self.display._write) @@ -305,11 +305,11 @@ class DisplayClearTestCase(unittest.TestCase): class DisplayFlushRangeTestCase(unittest.TestCase): def setUp(self): - self.interface = Mock() + self.terminal = Mock() dimensions = Dimensions(24, 80) - self.display = Display(self.interface, dimensions, None) + self.display = Display(self.terminal, dimensions, None) self.display._write = Mock(wraps=self.display._write) @@ -363,11 +363,11 @@ class DisplayFlushRangeTestCase(unittest.TestCase): class DisplayLoadAddressCounterTestCase(unittest.TestCase): def setUp(self): - self.interface = Mock() + self.terminal = Mock() dimensions = Dimensions(24, 80) - self.display = Display(self.interface, dimensions, None) + self.display = Display(self.terminal, dimensions, None) patcher = patch('oec.display.load_address_counter_hi') @@ -386,8 +386,8 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase): # Assert self.assertEqual(self.display.address_counter, 895) - self.load_address_counter_hi_mock.assert_called_with(self.interface, 3) - self.load_address_counter_lo_mock.assert_called_with(self.interface, 127) + self.load_address_counter_hi_mock.assert_called_with(self.terminal.interface, 3) + self.load_address_counter_lo_mock.assert_called_with(self.terminal.interface, 127) def test_hi_change(self): # Arrange @@ -402,7 +402,7 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase): # Assert self.assertEqual(self.display.address_counter, 1151) - self.load_address_counter_hi_mock.assert_called_with(self.interface, 4) + self.load_address_counter_hi_mock.assert_called_with(self.terminal.interface, 4) self.load_address_counter_lo_mock.assert_not_called() def test_lo_change(self): @@ -419,7 +419,7 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase): self.assertEqual(self.display.address_counter, 896) self.load_address_counter_hi_mock.assert_not_called() - self.load_address_counter_lo_mock.assert_called_with(self.interface, 128) + self.load_address_counter_lo_mock.assert_called_with(self.terminal.interface, 128) def test_hi_lo_change(self): # Arrange @@ -434,8 +434,8 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase): # Assert self.assertEqual(self.display.address_counter, 1152) - self.load_address_counter_hi_mock.assert_called_with(self.interface, 4) - self.load_address_counter_lo_mock.assert_called_with(self.interface, 128) + self.load_address_counter_hi_mock.assert_called_with(self.terminal.interface, 4) + self.load_address_counter_lo_mock.assert_called_with(self.terminal.interface, 128) def test_no_change(self): # Arrange @@ -466,16 +466,16 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase): # Assert self.assertEqual(self.display.address_counter, 80) - self.load_address_counter_hi_mock.assert_called_with(self.interface, 0) - self.load_address_counter_lo_mock.assert_called_with(self.interface, 80) + self.load_address_counter_hi_mock.assert_called_with(self.terminal.interface, 0) + self.load_address_counter_lo_mock.assert_called_with(self.terminal.interface, 80) class DisplayWriteTestCase(unittest.TestCase): def setUp(self): - self.interface = Mock() + self.terminal = Mock() dimensions = Dimensions(24, 80) - self.display = Display(self.interface, dimensions, None) + self.display = Display(self.terminal, dimensions, None) self.display._load_address_counter = Mock(wraps=self.display._load_address_counter) @@ -504,7 +504,7 @@ class DisplayWriteTestCase(unittest.TestCase): # Assert self.assertIsNone(self.display.address_counter) - self.write_data_mock.assert_called_with(self.interface, bytes.fromhex('01 02 03'), jumbo_write_strategy=None) + self.write_data_mock.assert_called_with(self.terminal.interface, bytes.fromhex('01 02 03'), jumbo_write_strategy=None) def test_with_eab_data(self): # Arrange @@ -516,7 +516,7 @@ class DisplayWriteTestCase(unittest.TestCase): # Assert self.assertIsNone(self.display.address_counter) - self.eab_write_alternate_mock.assert_called_with(self.interface, 7, bytes.fromhex('01 04 02 05 03 06'), jumbo_write_strategy=None) + self.eab_write_alternate_mock.assert_called_with(self.terminal.interface, 7, bytes.fromhex('01 04 02 05 03 06'), jumbo_write_strategy=None) def test_repeat_with_no_eab_data(self): # Act @@ -525,7 +525,7 @@ class DisplayWriteTestCase(unittest.TestCase): # Assert self.assertIsNone(self.display.address_counter) - self.write_data_mock.assert_called_with(self.interface, (bytes.fromhex('01 02 03'), 3), jumbo_write_strategy=None) + self.write_data_mock.assert_called_with(self.terminal.interface, (bytes.fromhex('01 02 03'), 3), jumbo_write_strategy=None) def test_repeat_with_eab_data(self): # Arrange @@ -537,7 +537,7 @@ class DisplayWriteTestCase(unittest.TestCase): # Assert self.assertIsNone(self.display.address_counter) - self.eab_write_alternate_mock.assert_called_with(self.interface, 7, (bytes.fromhex('01 04 02 05 03 06'), 3), jumbo_write_strategy=None) + self.eab_write_alternate_mock.assert_called_with(self.terminal.interface, 7, (bytes.fromhex('01 04 02 05 03 06'), 3), jumbo_write_strategy=None) def test_regen_eab_data_mismatch_format(self): # Arrange diff --git a/tests/test_terminal.py b/tests/test_terminal.py index 517399c..7f66850 100644 --- a/tests/test_terminal.py +++ b/tests/test_terminal.py @@ -1,11 +1,173 @@ import unittest +from unittest.mock import Mock, patch +from coax import Feature, PollAction +from coax.protocol import TerminalId, TerminalType import context +from oec.terminal import create_terminal, Terminal, UnsupportedTerminalError +from oec.display import Dimensions from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2 -class TerminalGetPollActionTestCase(unittest.TestCase): +class TerminalSetupTestCase(unittest.TestCase): def setUp(self): self.interface = Mock() - self.terminal = Terminal(self.interface, TerminalId(0b11110100), 'c1348300', { }, KEYMAP_3278_2) + terminal_id = TerminalId(0b11110100) + extended_id = 'c1348300' + dimensions = Dimensions(24, 80) + features = { } + keymap = KEYMAP_3278_2 + + self.terminal = Terminal(self.interface, terminal_id, extended_id, dimensions, features, keymap) + + self.terminal.display = Mock() + + patcher = patch('oec.terminal.load_control_register') + + self.load_control_register_mock = patcher.start() + + self.addCleanup(patch.stopall) + + def test(self): + self.terminal.setup() + +class TerminalPollTestCase(unittest.TestCase): + def setUp(self): + self.interface = Mock() + + terminal_id = TerminalId(0b11110100) + extended_id = 'c1348300' + dimensions = Dimensions(24, 80) + features = { } + keymap = KEYMAP_3278_2 + + self.terminal = Terminal(self.interface, terminal_id, extended_id, dimensions, features, keymap) + + patcher = patch('oec.terminal.poll') + + self.poll_mock = patcher.start() + + self.addCleanup(patch.stopall) + + # The terminal will be initialized in a state where the terminal keyboard clicker + # state is unknown, and this cannot be read. Therefore the first POLL will always + # attempt to set the keyboard clicker state... + self.terminal.poll() + + self.poll_mock.reset_mock() + + def test_with_no_queued_actions(self): + # Act + self.terminal.poll() + + # Assert + self.poll_mock.assert_called_with(self.interface, PollAction.NONE) + + def test_with_sound_alarm_queued(self): + # Arrange + self.terminal.sound_alarm() + + # Act + self.terminal.poll() + + # Assert + self.poll_mock.assert_called_with(self.interface, PollAction.ALARM) + + def test_with_enable_keyboard_clicker_queued(self): + # Arrange + self.assertFalse(self.terminal.keyboard.clicker) + + self.terminal.keyboard.toggle_clicker() + + # Act + self.terminal.poll() + + # Assert + self.poll_mock.assert_called_with(self.interface, PollAction.ENABLE_KEYBOARD_CLICKER) + +class CreateTerminalTestCase(unittest.TestCase): + def setUp(self): + self.interface = Mock() + + self.interface.legacy_firmware_detected = False + + self.get_keymap = lambda terminal_id, extended_id: KEYMAP_3278_2 + + patcher = patch('oec.terminal.read_terminal_id') + + self.read_terminal_id_mock = patcher.start() + + patcher = patch('oec.terminal.read_extended_id') + + self.read_extended_id_mock = patcher.start() + + patcher = patch('oec.terminal.get_features') + + self.get_features_mock = patcher.start() + + self.addCleanup(patch.stopall) + + def test_supported_terminal(self): + # Arrange + self.read_terminal_id_mock.return_value = TerminalId(0b11110100) + self.read_extended_id_mock.return_value = bytes.fromhex('c1 34 83 00') + self.get_features_mock.return_value = { Feature.EAB: 7 } + + # Act + terminal = create_terminal(self.interface, None, self.get_keymap) + + # Assert + self.assertEqual(terminal.terminal_id.type, TerminalType.CUT) + self.assertEqual(terminal.terminal_id.model, 2) + self.assertEqual(terminal.terminal_id.keyboard, 15) + self.assertEqual(terminal.extended_id, 'c1348300') + self.assertEqual(terminal.display.dimensions, Dimensions(24, 80)) + self.assertEqual(terminal.features, { Feature.EAB: 7 }) + self.assertEqual(terminal.keyboard.keymap.name, '3278-2') + + def test_unsupported_terminal_type(self): + # Arrange + self.read_terminal_id_mock.return_value = TerminalId(0b00000001) + + # Act and assert + with self.assertRaises(UnsupportedTerminalError): + create_terminal(self.interface, None, self.get_keymap) + + def test_unsupported_terminal_model(self): + # Arrange + terminal_id = TerminalId(0b11110100) + + terminal_id.model = 1 + + self.read_terminal_id_mock.return_value = terminal_id + + # Act and assert + with self.assertRaises(UnsupportedTerminalError): + create_terminal(self.interface, None, self.get_keymap) + + def test_eab_feature_removed_on_legacy_interface_without_strategy(self): + # Arrange + self.interface.legacy_firmware_detected = True + + self.read_terminal_id_mock.return_value = TerminalId(0b11110100) + self.read_extended_id_mock.return_value = bytes.fromhex('c1 34 83 00') + self.get_features_mock.return_value = { Feature.EAB: 7 } + + patcher = patch('oec.terminal._print_no_i1_eab_notice') + + print_no_i1_eab_notice_mock = patcher.start() + + # Act + terminal = create_terminal(self.interface, None, self.get_keymap) + + # Assert + self.assertEqual(terminal.terminal_id.type, TerminalType.CUT) + self.assertEqual(terminal.terminal_id.model, 2) + self.assertEqual(terminal.terminal_id.keyboard, 15) + self.assertEqual(terminal.extended_id, 'c1348300') + self.assertEqual(terminal.display.dimensions, Dimensions(24, 80)) + self.assertEqual(terminal.features, { }) + self.assertEqual(terminal.keyboard.keymap.name, '3278-2') + + print_no_i1_eab_notice_mock.assert_called_once()