mirror of
https://github.com/lowobservable/oec.git
synced 2026-01-11 23:53:04 +00:00
Refactor controller and terminal responsibilities
This commit is contained in:
parent
0c6dcf4cb9
commit
e10cf494d8
@ -3,16 +3,13 @@ oec.controller
|
||||
~~~~~~~~~~~~~~
|
||||
"""
|
||||
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
import selectors
|
||||
from textwrap import dedent
|
||||
from coax import poll, poll_ack, load_control_register, get_features, PollAction, \
|
||||
KeystrokePollResponse, TerminalType, Feature, ReceiveTimeout, \
|
||||
from coax import poll, poll_ack, KeystrokePollResponse, ReceiveTimeout, \
|
||||
ReceiveError, ProtocolError
|
||||
|
||||
from .terminal import Terminal, UnsupportedTerminalError, read_terminal_ids
|
||||
from .terminal import create_terminal, UnsupportedTerminalError
|
||||
from .keyboard import Key
|
||||
from .session import SessionDisconnectedError
|
||||
|
||||
@ -106,44 +103,9 @@ class Controller:
|
||||
def _handle_terminal_attached(self, poll_response):
|
||||
self.logger.info('Terminal attached')
|
||||
|
||||
jumbo_write_strategy = _get_jumbo_write_strategy()
|
||||
self.terminal = create_terminal(self.interface, poll_response, self.get_keymap)
|
||||
|
||||
# Read the terminal identifiers.
|
||||
(terminal_id, extended_id) = read_terminal_ids(self.interface)
|
||||
|
||||
self.logger.info(f'Terminal ID = {terminal_id}, Extended ID = {extended_id}')
|
||||
|
||||
if terminal_id.type != TerminalType.CUT:
|
||||
raise UnsupportedTerminalError('Only CUT type terminals are supported')
|
||||
|
||||
# Get the terminal features.
|
||||
features = get_features(self.interface)
|
||||
|
||||
self.logger.info(f'Features = {features}')
|
||||
|
||||
if Feature.EAB in features:
|
||||
if self.interface.legacy_firmware_detected and jumbo_write_strategy is None:
|
||||
del features[Feature.EAB]
|
||||
|
||||
_print_no_i1_eab_notice()
|
||||
|
||||
# Get the keymap.
|
||||
keymap = self.get_keymap(terminal_id, extended_id)
|
||||
|
||||
# Initialize the terminal.
|
||||
self.terminal = Terminal(self.interface, terminal_id, extended_id,
|
||||
features, keymap,
|
||||
jumbo_write_strategy=jumbo_write_strategy)
|
||||
|
||||
(rows, columns) = self.terminal.display.dimensions
|
||||
keymap_name = self.terminal.keyboard.keymap.name
|
||||
|
||||
self.logger.info(f'Rows = {rows}, Columns = {columns}, Keymap = {keymap_name}')
|
||||
|
||||
if self.terminal.display.has_eab:
|
||||
self.terminal.display.load_eab_mask(0xff)
|
||||
|
||||
self.terminal.display.clear(clear_status_line=True)
|
||||
self.terminal.setup()
|
||||
|
||||
# Show the attached indicator on the status line.
|
||||
self.terminal.display.status_line.write_string(0, 'S')
|
||||
@ -221,12 +183,8 @@ class Controller:
|
||||
|
||||
if key == Key.CURSOR_BLINK:
|
||||
self.terminal.display.toggle_cursor_blink()
|
||||
|
||||
self._load_control_register()
|
||||
elif key == Key.ALT_CURSOR:
|
||||
self.terminal.display.toggle_cursor_reverse()
|
||||
|
||||
self._load_control_register()
|
||||
elif key == Key.CLICKER:
|
||||
self.terminal.keyboard.toggle_clicker()
|
||||
elif self.session:
|
||||
@ -235,9 +193,12 @@ class Controller:
|
||||
def _poll(self):
|
||||
self.last_poll_time = time.perf_counter()
|
||||
|
||||
poll_action = self.terminal.get_poll_action() if self.terminal else PollAction.NONE
|
||||
|
||||
poll_response = poll(self.interface, poll_action, receive_timeout=1)
|
||||
# If a terminal is connected, use the terminal method to ensure that
|
||||
# any queued POLL action is applied.
|
||||
if self.terminal:
|
||||
poll_response = self.terminal.poll(receive_timeout=1)
|
||||
else:
|
||||
poll_response = poll(self.interface, receive_timeout=1)
|
||||
|
||||
if poll_response:
|
||||
try:
|
||||
@ -264,48 +225,3 @@ class Controller:
|
||||
period = self.disconnected_poll_period
|
||||
|
||||
return max((self.last_poll_time + period) - current_time, 0)
|
||||
|
||||
def _load_control_register(self):
|
||||
load_control_register(self.interface, self.terminal.get_control_register())
|
||||
|
||||
def _get_jumbo_write_strategy():
|
||||
value = os.environ.get('COAX_JUMBO')
|
||||
|
||||
if value is None:
|
||||
return None
|
||||
|
||||
if value in ['split', 'ignore']:
|
||||
return value
|
||||
|
||||
self.logger.warning(f'Unsupported COAX_JUMBO option: {value}')
|
||||
|
||||
return None
|
||||
|
||||
def _print_no_i1_eab_notice():
|
||||
notice = '''
|
||||
**** **** **** **** **** **** **** **** **** **** **** **** **** **** **** ****
|
||||
|
||||
Your terminal is reporting the existence of an EAB feature that allows extended
|
||||
colors and formatting, however...
|
||||
|
||||
I think you are using an older firmware on the 1st generation, Arduino Mega
|
||||
based, interface which does not support the "jumbo write" required to write a
|
||||
full screen to the regen and EAB buffers.
|
||||
|
||||
I'm going to continue as if the EAB feature did not exist...
|
||||
|
||||
If you want to override this behavior, you can set the COAX_JUMBO environment
|
||||
variable as follows:
|
||||
|
||||
- COAX_JUMBO=split - split large writes into multiple smaller 32-byte writes
|
||||
before sending to the interface, this will result in
|
||||
additional round trips to the interface which may
|
||||
manifest as visible incremental changes being applied
|
||||
to the screen
|
||||
- COAX_JUMBO=ignore - try a jumbo write, anyway, use this option if you
|
||||
believe you are seeing this behavior in error
|
||||
|
||||
**** **** **** **** **** **** **** **** **** **** **** **** **** **** **** ****
|
||||
'''
|
||||
|
||||
print(dedent(notice))
|
||||
|
||||
@ -170,10 +170,10 @@ def encode_string(string, errors='replace'):
|
||||
Dimensions = namedtuple('Dimensions', ['rows', 'columns'])
|
||||
|
||||
class Display:
|
||||
def __init__(self, interface, dimensions, eab_address, jumbo_write_strategy=None):
|
||||
def __init__(self, terminal, dimensions, eab_address, jumbo_write_strategy=None):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
self.interface = interface
|
||||
self.terminal = terminal
|
||||
self.dimensions = dimensions
|
||||
self.eab_address = eab_address
|
||||
|
||||
@ -187,9 +187,6 @@ class Display:
|
||||
|
||||
self.status_line = StatusLine(self)
|
||||
|
||||
self.cursor_reverse = False
|
||||
self.cursor_blink = False
|
||||
|
||||
self.jumbo_write_strategy = jumbo_write_strategy
|
||||
|
||||
@property
|
||||
@ -209,7 +206,7 @@ class Display:
|
||||
if not self.has_eab:
|
||||
raise RuntimeError('No EAB feature')
|
||||
|
||||
eab_load_mask(self.interface, self.eab_address, mask)
|
||||
eab_load_mask(self.terminal.interface, self.eab_address, mask)
|
||||
|
||||
def buffered_write(self, regen_byte, eab_byte, index=None, row=None, column=None):
|
||||
if index is None:
|
||||
@ -257,10 +254,14 @@ class Display:
|
||||
self.move_cursor(row=0, column=0, force_load=True)
|
||||
|
||||
def toggle_cursor_blink(self):
|
||||
self.cursor_blink = not self.cursor_blink
|
||||
self.terminal.control.cursor_blink = not self.terminal.control.cursor_blink
|
||||
|
||||
self.terminal.load_control_register()
|
||||
|
||||
def toggle_cursor_reverse(self):
|
||||
self.cursor_reverse = not self.cursor_reverse
|
||||
self.terminal.control.cursor_reverse = not self.terminal.control.cursor_reverse
|
||||
|
||||
self.terminal.load_control_register()
|
||||
|
||||
def _get_index(self, row, column):
|
||||
return (row * self.dimensions.columns) + column
|
||||
@ -289,8 +290,8 @@ class Display:
|
||||
return address
|
||||
|
||||
def _read_address_counter(self):
|
||||
hi = read_address_counter_hi(self.interface)
|
||||
lo = read_address_counter_lo(self.interface)
|
||||
hi = read_address_counter_hi(self.terminal.interface)
|
||||
lo = read_address_counter_lo(self.terminal.interface)
|
||||
|
||||
return (hi << 8) | lo
|
||||
|
||||
@ -302,10 +303,10 @@ class Display:
|
||||
(current_hi, current_lo) = _split_address(self.address_counter)
|
||||
|
||||
if hi != current_hi or force_load:
|
||||
load_address_counter_hi(self.interface, hi)
|
||||
load_address_counter_hi(self.terminal.interface, hi)
|
||||
|
||||
if lo != current_lo or force_load:
|
||||
load_address_counter_lo(self.interface, lo)
|
||||
load_address_counter_lo(self.terminal.interface, lo)
|
||||
|
||||
self.address_counter = address
|
||||
|
||||
@ -371,10 +372,10 @@ class Display:
|
||||
else:
|
||||
data = bytes(interleave(regen_data, eab_data))
|
||||
|
||||
eab_write_alternate(self.interface, self.eab_address, data,
|
||||
eab_write_alternate(self.terminal.interface, self.eab_address, data,
|
||||
jumbo_write_strategy=self.jumbo_write_strategy)
|
||||
else:
|
||||
write_data(self.interface, regen_data,
|
||||
write_data(self.terminal.interface, regen_data,
|
||||
jumbo_write_strategy=self.jumbo_write_strategy)
|
||||
|
||||
if isinstance(regen_data, tuple):
|
||||
|
||||
180
oec/terminal.py
180
oec/terminal.py
@ -3,12 +3,15 @@ oec.terminal
|
||||
~~~~~~~~~~~~
|
||||
"""
|
||||
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
from coax import read_terminal_id, read_extended_id, PollAction, Control, Feature, \
|
||||
from textwrap import dedent
|
||||
from coax import poll, read_terminal_id, read_extended_id, get_features, \
|
||||
load_control_register, TerminalType, Feature, PollAction, Control, \
|
||||
ReceiveError, ProtocolError
|
||||
|
||||
from .display import Dimensions, Display
|
||||
from .display import Dimensions, Display
|
||||
from .keyboard import Keyboard
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -20,14 +23,115 @@ MODEL_DIMENSIONS = {
|
||||
5: Dimensions(27, 132)
|
||||
}
|
||||
|
||||
def get_dimensions(terminal_id, extended_id):
|
||||
"""Get terminal display dimensions."""
|
||||
if not terminal_id.model in MODEL_DIMENSIONS:
|
||||
raise ValueError(f'Model {terminal_id.model} is not supported')
|
||||
class Terminal:
|
||||
"""The terminal."""
|
||||
|
||||
return MODEL_DIMENSIONS[terminal_id.model]
|
||||
def __init__(self, interface, terminal_id, extended_id, dimensions, features,
|
||||
keymap, jumbo_write_strategy=None):
|
||||
self.interface = interface
|
||||
self.terminal_id = terminal_id
|
||||
self.extended_id = extended_id
|
||||
self.features = features
|
||||
|
||||
def read_terminal_ids(interface, extended_id_retry_attempts=3):
|
||||
self.control = Control(step_inhibit=False, display_inhibit=False,
|
||||
cursor_inhibit=False, cursor_reverse=False,
|
||||
cursor_blink=False)
|
||||
|
||||
self.display = Display(self, dimensions, features.get(Feature.EAB),
|
||||
jumbo_write_strategy=jumbo_write_strategy)
|
||||
self.keyboard = Keyboard(keymap)
|
||||
|
||||
self.alarm = False
|
||||
self.last_poll_keyboard_clicker = None
|
||||
|
||||
def setup(self):
|
||||
"""Load registers and clear the display."""
|
||||
self.load_control_register()
|
||||
|
||||
if self.display.has_eab:
|
||||
self.display.load_eab_mask(0xff)
|
||||
|
||||
self.display.clear(clear_status_line=True)
|
||||
|
||||
def poll(self, **kwargs):
|
||||
"""Execute a POLL command with queued actions."""
|
||||
poll_action = PollAction.NONE
|
||||
|
||||
# Convert a queued alarm or keyboard clicker change to POLL action.
|
||||
if self.alarm:
|
||||
poll_action = PollAction.ALARM
|
||||
elif self.keyboard.clicker != self.last_poll_keyboard_clicker:
|
||||
if self.keyboard.clicker:
|
||||
poll_action = PollAction.ENABLE_KEYBOARD_CLICKER
|
||||
else:
|
||||
poll_action = PollAction.DISABLE_KEYBOARD_CLICKER
|
||||
|
||||
poll_response = poll(self.interface, poll_action, **kwargs)
|
||||
|
||||
# Clear the queued alarm and keyboard clicker change if the POLL was
|
||||
# successful.
|
||||
if poll_action == PollAction.ALARM:
|
||||
self.alarm = False
|
||||
elif poll_action in [PollAction.ENABLE_KEYBOARD_CLICKER,
|
||||
PollAction.DISABLE_KEYBOARD_CLICKER]:
|
||||
self.last_poll_keyboard_clicker = self.keyboard.clicker
|
||||
|
||||
return poll_response
|
||||
|
||||
def sound_alarm(self):
|
||||
"""Queue an alarm on next POLL command."""
|
||||
self.alarm = True
|
||||
|
||||
def load_control_register(self):
|
||||
"""Execute a LOAD_CONTROL_REGISTER command."""
|
||||
load_control_register(self.interface, self.control)
|
||||
|
||||
class UnsupportedTerminalError(Exception):
|
||||
"""Unsupported terminal."""
|
||||
|
||||
def create_terminal(interface, poll_response, get_keymap):
|
||||
"""Terminal factory."""
|
||||
jumbo_write_strategy = _get_jumbo_write_strategy()
|
||||
|
||||
# Read the terminal identifiers.
|
||||
(terminal_id, extended_id) = _read_terminal_ids(interface)
|
||||
|
||||
logger.info(f'Terminal ID = {terminal_id}, Extended ID = {extended_id}')
|
||||
|
||||
if terminal_id.type != TerminalType.CUT:
|
||||
raise UnsupportedTerminalError('Only CUT type terminals are supported')
|
||||
|
||||
# Get the terminal dimensions.
|
||||
dimensions = MODEL_DIMENSIONS.get(terminal_id.model)
|
||||
|
||||
if dimensions is None:
|
||||
raise UnsupportedTerminalError(f'Model {terminal_id.model} is not supported')
|
||||
|
||||
logger.info(f'Rows = {dimensions.rows}, Columns = {dimensions.columns}')
|
||||
|
||||
# Get the terminal features.
|
||||
features = get_features(interface)
|
||||
|
||||
logger.info(f'Features = {features}')
|
||||
|
||||
if Feature.EAB in features:
|
||||
if interface.legacy_firmware_detected and jumbo_write_strategy is None:
|
||||
del features[Feature.EAB]
|
||||
|
||||
_print_no_i1_eab_notice()
|
||||
|
||||
# Get the keymap.
|
||||
keymap = get_keymap(terminal_id, extended_id)
|
||||
|
||||
logger.info(f'Keymap = {keymap.name}')
|
||||
|
||||
# Create the terminal.
|
||||
terminal = Terminal(interface, terminal_id, extended_id, dimensions, features,
|
||||
keymap, jumbo_write_strategy=jumbo_write_strategy)
|
||||
|
||||
return terminal
|
||||
|
||||
def _read_terminal_ids(interface, extended_id_retry_attempts=3):
|
||||
terminal_id = None
|
||||
extended_id = None
|
||||
|
||||
@ -56,46 +160,44 @@ def read_terminal_ids(interface, extended_id_retry_attempts=3):
|
||||
|
||||
return (terminal_id, extended_id.hex() if extended_id is not None else None)
|
||||
|
||||
class Terminal:
|
||||
"""Terminal information and devices."""
|
||||
def _get_jumbo_write_strategy():
|
||||
value = os.environ.get('COAX_JUMBO')
|
||||
|
||||
def __init__(self, interface, terminal_id, extended_id, features, keymap,
|
||||
jumbo_write_strategy=None):
|
||||
self.interface = interface
|
||||
self.terminal_id = terminal_id
|
||||
self.extended_id = extended_id
|
||||
self.features = features
|
||||
if value is None:
|
||||
return None
|
||||
|
||||
dimensions = get_dimensions(self.terminal_id, self.extended_id)
|
||||
if value in ['split', 'ignore']:
|
||||
return value
|
||||
|
||||
self.display = Display(interface, dimensions, features.get(Feature.EAB),
|
||||
jumbo_write_strategy=jumbo_write_strategy)
|
||||
self.keyboard = Keyboard(keymap)
|
||||
logger.warning(f'Unsupported COAX_JUMBO option: {value}')
|
||||
|
||||
self.alarm = False
|
||||
self.last_poll_keyboard_clicker = None
|
||||
return None
|
||||
|
||||
def sound_alarm(self):
|
||||
self.alarm = True
|
||||
def _print_no_i1_eab_notice():
|
||||
notice = '''
|
||||
**** **** **** **** **** **** **** **** **** **** **** **** **** **** **** ****
|
||||
|
||||
def get_poll_action(self):
|
||||
if self.alarm:
|
||||
self.alarm = False
|
||||
Your terminal is reporting the existence of an EAB feature that allows extended
|
||||
colors and formatting, however...
|
||||
|
||||
return PollAction.ALARM
|
||||
I think you are using an older firmware on the 1st generation, Arduino Mega
|
||||
based, interface which does not support the "jumbo write" required to write a
|
||||
full screen to the regen and EAB buffers.
|
||||
|
||||
if self.keyboard.clicker != self.last_poll_keyboard_clicker:
|
||||
self.last_poll_keyboard_clicker = self.keyboard.clicker
|
||||
I'm going to continue as if the EAB feature did not exist...
|
||||
|
||||
return PollAction.ENABLE_KEYBOARD_CLICKER if self.keyboard.clicker else PollAction.DISABLE_KEYBOARD_CLICKER
|
||||
If you want to override this behavior, you can set the COAX_JUMBO environment
|
||||
variable as follows:
|
||||
|
||||
return PollAction.NONE
|
||||
- COAX_JUMBO=split - split large writes into multiple smaller 32-byte writes
|
||||
before sending to the interface, this will result in
|
||||
additional round trips to the interface which may
|
||||
manifest as visible incremental changes being applied
|
||||
to the screen
|
||||
- COAX_JUMBO=ignore - try a jumbo write, anyway, use this option if you
|
||||
believe you are seeing this behavior in error
|
||||
|
||||
def get_control_register(self):
|
||||
control = Control(cursor_reverse=self.display.cursor_reverse,
|
||||
cursor_blink=self.display.cursor_blink)
|
||||
**** **** **** **** **** **** **** **** **** **** **** **** **** **** **** ****
|
||||
'''
|
||||
|
||||
return control
|
||||
|
||||
class UnsupportedTerminalError(Exception):
|
||||
"""Unsupported terminal."""
|
||||
print(dedent(notice))
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import selectors
|
||||
import unittest
|
||||
from unittest.mock import Mock, PropertyMock, patch
|
||||
from unittest.mock import Mock, patch
|
||||
from coax import PollAction, PowerOnResetCompletePollResponse, KeystrokePollResponse, ReceiveTimeout
|
||||
from coax.protocol import TerminalId
|
||||
|
||||
@ -8,12 +8,11 @@ import context
|
||||
|
||||
from oec.controller import Controller
|
||||
from oec.session import SessionDisconnectedError
|
||||
from oec.terminal import Terminal, UnsupportedTerminalError
|
||||
from oec.display import Dimensions
|
||||
from oec.keyboard import KeyboardModifiers, Key
|
||||
from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2
|
||||
|
||||
CUT_TERMINAL_IDS = (TerminalId(0b11110100), 'c1348300')
|
||||
DFT_TERMINAL_IDS = (TerminalId(0b00000001), None)
|
||||
|
||||
class RunLoopTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.interface = Mock()
|
||||
@ -31,30 +30,30 @@ class RunLoopTestCase(unittest.TestCase):
|
||||
|
||||
self.controller._update_session = Mock()
|
||||
|
||||
patcher = patch('oec.controller.poll')
|
||||
self.terminal = Terminal(self.interface, TerminalId(0b11110100), 'c1348300', Dimensions(24, 80), { }, KEYMAP_3278_2, None)
|
||||
|
||||
self.poll_mock = patcher.start()
|
||||
self.terminal.setup = Mock()
|
||||
|
||||
self.terminal.display.toggle_cursor_blink = Mock()
|
||||
self.terminal.display.toggle_cursor_reverse = Mock()
|
||||
self.terminal.display._write = Mock()
|
||||
|
||||
self.terminal.keyboard.toggle_clicker = Mock()
|
||||
|
||||
self.poll_mock = Mock()
|
||||
|
||||
patcher = patch('oec.controller.poll', self.poll_mock)
|
||||
|
||||
patcher.start()
|
||||
|
||||
patcher = patch('oec.terminal.poll', self.poll_mock)
|
||||
|
||||
patcher.start()
|
||||
|
||||
patcher = patch('oec.controller.poll_ack')
|
||||
|
||||
self.poll_ack_mock = patcher.start()
|
||||
|
||||
patcher = patch('oec.controller.read_terminal_ids')
|
||||
|
||||
self.read_terminal_ids_mock = patcher.start()
|
||||
|
||||
self.read_terminal_ids_mock.return_value = CUT_TERMINAL_IDS
|
||||
|
||||
patcher = patch('oec.controller.load_control_register')
|
||||
|
||||
self.load_control_register_mock = patcher.start()
|
||||
|
||||
patcher = patch('oec.controller.get_features')
|
||||
|
||||
self.get_features_mock = patcher.start()
|
||||
|
||||
self.get_features_mock.return_value = { }
|
||||
|
||||
patcher = patch('oec.controller.time.perf_counter')
|
||||
|
||||
self.perf_counter_mock = patcher.start()
|
||||
@ -63,17 +62,11 @@ class RunLoopTestCase(unittest.TestCase):
|
||||
|
||||
self.sleep_mock = patcher.start()
|
||||
|
||||
patcher = patch('oec.display.load_address_counter_hi')
|
||||
patcher = patch('oec.controller.create_terminal')
|
||||
|
||||
patcher.start()
|
||||
self.create_terminal_mock = patcher.start()
|
||||
|
||||
patcher = patch('oec.display.load_address_counter_lo')
|
||||
|
||||
patcher.start()
|
||||
|
||||
patcher = patch('oec.display.write_data')
|
||||
|
||||
patcher.start()
|
||||
self.create_terminal_mock.return_value = self.terminal
|
||||
|
||||
self.addCleanup(patch.stopall)
|
||||
|
||||
@ -95,7 +88,7 @@ class RunLoopTestCase(unittest.TestCase):
|
||||
self.controller._update_session.assert_called()
|
||||
|
||||
def test_unsupported_terminal_attached(self):
|
||||
self.read_terminal_ids_mock.return_value = DFT_TERMINAL_IDS
|
||||
self.create_terminal_mock.side_effect = [UnsupportedTerminalError]
|
||||
|
||||
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
|
||||
|
||||
@ -136,91 +129,50 @@ class RunLoopTestCase(unittest.TestCase):
|
||||
|
||||
self.assertEqual(self.create_session_mock.call_count, 2)
|
||||
|
||||
def test_alarm(self):
|
||||
# Arrange
|
||||
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
|
||||
self._assert_run_loop(0, None, False, 0, False)
|
||||
|
||||
self.assertIsNotNone(self.controller.terminal)
|
||||
|
||||
# Act
|
||||
self.controller.terminal.sound_alarm()
|
||||
|
||||
# Assert
|
||||
self._assert_run_loop(0.5, None, True, 0.5, False)
|
||||
|
||||
self.assertEqual(self.poll_mock.call_args[0][1], PollAction.ALARM)
|
||||
|
||||
self.assertFalse(self.controller.terminal.alarm)
|
||||
|
||||
def test_toggle_cursor_blink(self):
|
||||
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
|
||||
|
||||
self.assertFalse(self.controller.terminal.display.cursor_blink)
|
||||
self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True)
|
||||
|
||||
self.terminal.display.toggle_cursor_blink.assert_called_once()
|
||||
|
||||
self.terminal.display.toggle_cursor_blink.reset_mock()
|
||||
|
||||
self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True)
|
||||
|
||||
self.assertTrue(self.controller.terminal.display.cursor_blink)
|
||||
|
||||
self.load_control_register_mock.assert_called()
|
||||
|
||||
self.assertTrue(self.load_control_register_mock.call_args[0][1].cursor_blink)
|
||||
|
||||
self.load_control_register_mock.reset_mock()
|
||||
|
||||
self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True)
|
||||
|
||||
self.assertFalse(self.controller.terminal.display.cursor_blink)
|
||||
|
||||
self.load_control_register_mock.assert_called()
|
||||
|
||||
self.assertFalse(self.load_control_register_mock.call_args[0][1].cursor_blink)
|
||||
self.terminal.display.toggle_cursor_blink.assert_called_once()
|
||||
|
||||
def test_toggle_cursor_reverse(self):
|
||||
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
|
||||
|
||||
self.assertFalse(self.controller.terminal.display.cursor_reverse)
|
||||
self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True)
|
||||
self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True)
|
||||
self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True)
|
||||
|
||||
self.terminal.display.toggle_cursor_reverse.assert_called_once()
|
||||
|
||||
self.terminal.display.toggle_cursor_reverse.reset_mock()
|
||||
|
||||
self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True)
|
||||
self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True)
|
||||
self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True)
|
||||
|
||||
self.assertTrue(self.controller.terminal.display.cursor_reverse)
|
||||
|
||||
self.load_control_register_mock.assert_called()
|
||||
|
||||
self.assertTrue(self.load_control_register_mock.call_args[0][1].cursor_reverse)
|
||||
|
||||
self.load_control_register_mock.reset_mock()
|
||||
|
||||
self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True)
|
||||
self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True)
|
||||
self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True)
|
||||
|
||||
self.assertFalse(self.controller.terminal.display.cursor_reverse)
|
||||
|
||||
self.load_control_register_mock.assert_called()
|
||||
|
||||
self.assertFalse(self.load_control_register_mock.call_args[0][1].cursor_reverse)
|
||||
self.terminal.display.toggle_cursor_reverse.assert_called_once()
|
||||
|
||||
def test_toggle_clicker(self):
|
||||
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
|
||||
|
||||
self.assertFalse(self.controller.terminal.keyboard.clicker)
|
||||
|
||||
self._assert_run_loop(0, KeystrokePollResponse(0b0101011110), False, 0, True)
|
||||
self._assert_run_loop(0, None, False, 0, False)
|
||||
|
||||
self.assertTrue(self.controller.terminal.keyboard.clicker)
|
||||
self.terminal.keyboard.toggle_clicker.assert_called_once()
|
||||
|
||||
self.assertEqual(self.poll_mock.call_args[0][1], PollAction.ENABLE_KEYBOARD_CLICKER)
|
||||
self.terminal.keyboard.toggle_clicker.reset_mock()
|
||||
|
||||
self._assert_run_loop(0.5, KeystrokePollResponse(0b0101011110), True, 0.5, True)
|
||||
self._assert_run_loop(1, None, False, 0, False)
|
||||
|
||||
self.assertFalse(self.controller.terminal.keyboard.clicker)
|
||||
|
||||
self.assertEqual(self.poll_mock.call_args[0][1], PollAction.DISABLE_KEYBOARD_CLICKER)
|
||||
self.terminal.keyboard.toggle_clicker.assert_called_once()
|
||||
|
||||
def _assert_run_loop(self, poll_time, poll_response, expected_update_session, expected_poll_delay, expected_poll_ack):
|
||||
# Arrange
|
||||
|
||||
@ -7,11 +7,11 @@ from oec.display import Dimensions, Display, encode_ascii_character, encode_ebcd
|
||||
|
||||
class DisplayMoveCursorTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.interface = Mock()
|
||||
self.terminal = Mock()
|
||||
|
||||
dimensions = Dimensions(24, 80)
|
||||
|
||||
self.display = Display(self.interface, dimensions, None)
|
||||
self.display = Display(self.terminal, dimensions, None)
|
||||
|
||||
self.display._load_address_counter = Mock(wraps=self.display._load_address_counter)
|
||||
|
||||
@ -54,11 +54,11 @@ class DisplayMoveCursorTestCase(unittest.TestCase):
|
||||
|
||||
class DisplayBufferedWriteTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.interface = Mock()
|
||||
self.terminal = Mock()
|
||||
|
||||
dimensions = Dimensions(24, 80)
|
||||
|
||||
self.display = Display(self.interface, Dimensions(24, 80), None)
|
||||
self.display = Display(self.terminal, Dimensions(24, 80), None)
|
||||
|
||||
def test_with_no_eab(self):
|
||||
# Act
|
||||
@ -113,11 +113,11 @@ class DisplayBufferedWriteTestCase(unittest.TestCase):
|
||||
|
||||
class DisplayFlushTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.interface = Mock()
|
||||
self.terminal = Mock()
|
||||
|
||||
dimensions = Dimensions(24, 80)
|
||||
|
||||
self.display = Display(self.interface, dimensions, None)
|
||||
self.display = Display(self.terminal, dimensions, None)
|
||||
|
||||
self.display._flush_range = Mock()
|
||||
|
||||
@ -196,11 +196,11 @@ class DisplayFlushTestCase(unittest.TestCase):
|
||||
|
||||
class DisplayClearTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.interface = Mock()
|
||||
self.terminal = Mock()
|
||||
|
||||
dimensions = Dimensions(24, 80)
|
||||
|
||||
self.display = Display(self.interface, dimensions, None)
|
||||
self.display = Display(self.terminal, dimensions, None)
|
||||
|
||||
self.display._load_address_counter = Mock(wraps=self.display._load_address_counter)
|
||||
self.display._write = Mock(wraps=self.display._write)
|
||||
@ -305,11 +305,11 @@ class DisplayClearTestCase(unittest.TestCase):
|
||||
|
||||
class DisplayFlushRangeTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.interface = Mock()
|
||||
self.terminal = Mock()
|
||||
|
||||
dimensions = Dimensions(24, 80)
|
||||
|
||||
self.display = Display(self.interface, dimensions, None)
|
||||
self.display = Display(self.terminal, dimensions, None)
|
||||
|
||||
self.display._write = Mock(wraps=self.display._write)
|
||||
|
||||
@ -363,11 +363,11 @@ class DisplayFlushRangeTestCase(unittest.TestCase):
|
||||
|
||||
class DisplayLoadAddressCounterTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.interface = Mock()
|
||||
self.terminal = Mock()
|
||||
|
||||
dimensions = Dimensions(24, 80)
|
||||
|
||||
self.display = Display(self.interface, dimensions, None)
|
||||
self.display = Display(self.terminal, dimensions, None)
|
||||
|
||||
patcher = patch('oec.display.load_address_counter_hi')
|
||||
|
||||
@ -386,8 +386,8 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase):
|
||||
# Assert
|
||||
self.assertEqual(self.display.address_counter, 895)
|
||||
|
||||
self.load_address_counter_hi_mock.assert_called_with(self.interface, 3)
|
||||
self.load_address_counter_lo_mock.assert_called_with(self.interface, 127)
|
||||
self.load_address_counter_hi_mock.assert_called_with(self.terminal.interface, 3)
|
||||
self.load_address_counter_lo_mock.assert_called_with(self.terminal.interface, 127)
|
||||
|
||||
def test_hi_change(self):
|
||||
# Arrange
|
||||
@ -402,7 +402,7 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase):
|
||||
# Assert
|
||||
self.assertEqual(self.display.address_counter, 1151)
|
||||
|
||||
self.load_address_counter_hi_mock.assert_called_with(self.interface, 4)
|
||||
self.load_address_counter_hi_mock.assert_called_with(self.terminal.interface, 4)
|
||||
self.load_address_counter_lo_mock.assert_not_called()
|
||||
|
||||
def test_lo_change(self):
|
||||
@ -419,7 +419,7 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase):
|
||||
self.assertEqual(self.display.address_counter, 896)
|
||||
|
||||
self.load_address_counter_hi_mock.assert_not_called()
|
||||
self.load_address_counter_lo_mock.assert_called_with(self.interface, 128)
|
||||
self.load_address_counter_lo_mock.assert_called_with(self.terminal.interface, 128)
|
||||
|
||||
def test_hi_lo_change(self):
|
||||
# Arrange
|
||||
@ -434,8 +434,8 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase):
|
||||
# Assert
|
||||
self.assertEqual(self.display.address_counter, 1152)
|
||||
|
||||
self.load_address_counter_hi_mock.assert_called_with(self.interface, 4)
|
||||
self.load_address_counter_lo_mock.assert_called_with(self.interface, 128)
|
||||
self.load_address_counter_hi_mock.assert_called_with(self.terminal.interface, 4)
|
||||
self.load_address_counter_lo_mock.assert_called_with(self.terminal.interface, 128)
|
||||
|
||||
def test_no_change(self):
|
||||
# Arrange
|
||||
@ -466,16 +466,16 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase):
|
||||
# Assert
|
||||
self.assertEqual(self.display.address_counter, 80)
|
||||
|
||||
self.load_address_counter_hi_mock.assert_called_with(self.interface, 0)
|
||||
self.load_address_counter_lo_mock.assert_called_with(self.interface, 80)
|
||||
self.load_address_counter_hi_mock.assert_called_with(self.terminal.interface, 0)
|
||||
self.load_address_counter_lo_mock.assert_called_with(self.terminal.interface, 80)
|
||||
|
||||
class DisplayWriteTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.interface = Mock()
|
||||
self.terminal = Mock()
|
||||
|
||||
dimensions = Dimensions(24, 80)
|
||||
|
||||
self.display = Display(self.interface, dimensions, None)
|
||||
self.display = Display(self.terminal, dimensions, None)
|
||||
|
||||
self.display._load_address_counter = Mock(wraps=self.display._load_address_counter)
|
||||
|
||||
@ -504,7 +504,7 @@ class DisplayWriteTestCase(unittest.TestCase):
|
||||
# Assert
|
||||
self.assertIsNone(self.display.address_counter)
|
||||
|
||||
self.write_data_mock.assert_called_with(self.interface, bytes.fromhex('01 02 03'), jumbo_write_strategy=None)
|
||||
self.write_data_mock.assert_called_with(self.terminal.interface, bytes.fromhex('01 02 03'), jumbo_write_strategy=None)
|
||||
|
||||
def test_with_eab_data(self):
|
||||
# Arrange
|
||||
@ -516,7 +516,7 @@ class DisplayWriteTestCase(unittest.TestCase):
|
||||
# Assert
|
||||
self.assertIsNone(self.display.address_counter)
|
||||
|
||||
self.eab_write_alternate_mock.assert_called_with(self.interface, 7, bytes.fromhex('01 04 02 05 03 06'), jumbo_write_strategy=None)
|
||||
self.eab_write_alternate_mock.assert_called_with(self.terminal.interface, 7, bytes.fromhex('01 04 02 05 03 06'), jumbo_write_strategy=None)
|
||||
|
||||
def test_repeat_with_no_eab_data(self):
|
||||
# Act
|
||||
@ -525,7 +525,7 @@ class DisplayWriteTestCase(unittest.TestCase):
|
||||
# Assert
|
||||
self.assertIsNone(self.display.address_counter)
|
||||
|
||||
self.write_data_mock.assert_called_with(self.interface, (bytes.fromhex('01 02 03'), 3), jumbo_write_strategy=None)
|
||||
self.write_data_mock.assert_called_with(self.terminal.interface, (bytes.fromhex('01 02 03'), 3), jumbo_write_strategy=None)
|
||||
|
||||
def test_repeat_with_eab_data(self):
|
||||
# Arrange
|
||||
@ -537,7 +537,7 @@ class DisplayWriteTestCase(unittest.TestCase):
|
||||
# Assert
|
||||
self.assertIsNone(self.display.address_counter)
|
||||
|
||||
self.eab_write_alternate_mock.assert_called_with(self.interface, 7, (bytes.fromhex('01 04 02 05 03 06'), 3), jumbo_write_strategy=None)
|
||||
self.eab_write_alternate_mock.assert_called_with(self.terminal.interface, 7, (bytes.fromhex('01 04 02 05 03 06'), 3), jumbo_write_strategy=None)
|
||||
|
||||
def test_regen_eab_data_mismatch_format(self):
|
||||
# Arrange
|
||||
|
||||
@ -1,11 +1,173 @@
|
||||
import unittest
|
||||
from unittest.mock import Mock, patch
|
||||
from coax import Feature, PollAction
|
||||
from coax.protocol import TerminalId, TerminalType
|
||||
|
||||
import context
|
||||
|
||||
from oec.terminal import create_terminal, Terminal, UnsupportedTerminalError
|
||||
from oec.display import Dimensions
|
||||
from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2
|
||||
|
||||
class TerminalGetPollActionTestCase(unittest.TestCase):
|
||||
class TerminalSetupTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.interface = Mock()
|
||||
|
||||
self.terminal = Terminal(self.interface, TerminalId(0b11110100), 'c1348300', { }, KEYMAP_3278_2)
|
||||
terminal_id = TerminalId(0b11110100)
|
||||
extended_id = 'c1348300'
|
||||
dimensions = Dimensions(24, 80)
|
||||
features = { }
|
||||
keymap = KEYMAP_3278_2
|
||||
|
||||
self.terminal = Terminal(self.interface, terminal_id, extended_id, dimensions, features, keymap)
|
||||
|
||||
self.terminal.display = Mock()
|
||||
|
||||
patcher = patch('oec.terminal.load_control_register')
|
||||
|
||||
self.load_control_register_mock = patcher.start()
|
||||
|
||||
self.addCleanup(patch.stopall)
|
||||
|
||||
def test(self):
|
||||
self.terminal.setup()
|
||||
|
||||
class TerminalPollTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.interface = Mock()
|
||||
|
||||
terminal_id = TerminalId(0b11110100)
|
||||
extended_id = 'c1348300'
|
||||
dimensions = Dimensions(24, 80)
|
||||
features = { }
|
||||
keymap = KEYMAP_3278_2
|
||||
|
||||
self.terminal = Terminal(self.interface, terminal_id, extended_id, dimensions, features, keymap)
|
||||
|
||||
patcher = patch('oec.terminal.poll')
|
||||
|
||||
self.poll_mock = patcher.start()
|
||||
|
||||
self.addCleanup(patch.stopall)
|
||||
|
||||
# The terminal will be initialized in a state where the terminal keyboard clicker
|
||||
# state is unknown, and this cannot be read. Therefore the first POLL will always
|
||||
# attempt to set the keyboard clicker state...
|
||||
self.terminal.poll()
|
||||
|
||||
self.poll_mock.reset_mock()
|
||||
|
||||
def test_with_no_queued_actions(self):
|
||||
# Act
|
||||
self.terminal.poll()
|
||||
|
||||
# Assert
|
||||
self.poll_mock.assert_called_with(self.interface, PollAction.NONE)
|
||||
|
||||
def test_with_sound_alarm_queued(self):
|
||||
# Arrange
|
||||
self.terminal.sound_alarm()
|
||||
|
||||
# Act
|
||||
self.terminal.poll()
|
||||
|
||||
# Assert
|
||||
self.poll_mock.assert_called_with(self.interface, PollAction.ALARM)
|
||||
|
||||
def test_with_enable_keyboard_clicker_queued(self):
|
||||
# Arrange
|
||||
self.assertFalse(self.terminal.keyboard.clicker)
|
||||
|
||||
self.terminal.keyboard.toggle_clicker()
|
||||
|
||||
# Act
|
||||
self.terminal.poll()
|
||||
|
||||
# Assert
|
||||
self.poll_mock.assert_called_with(self.interface, PollAction.ENABLE_KEYBOARD_CLICKER)
|
||||
|
||||
class CreateTerminalTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.interface = Mock()
|
||||
|
||||
self.interface.legacy_firmware_detected = False
|
||||
|
||||
self.get_keymap = lambda terminal_id, extended_id: KEYMAP_3278_2
|
||||
|
||||
patcher = patch('oec.terminal.read_terminal_id')
|
||||
|
||||
self.read_terminal_id_mock = patcher.start()
|
||||
|
||||
patcher = patch('oec.terminal.read_extended_id')
|
||||
|
||||
self.read_extended_id_mock = patcher.start()
|
||||
|
||||
patcher = patch('oec.terminal.get_features')
|
||||
|
||||
self.get_features_mock = patcher.start()
|
||||
|
||||
self.addCleanup(patch.stopall)
|
||||
|
||||
def test_supported_terminal(self):
|
||||
# Arrange
|
||||
self.read_terminal_id_mock.return_value = TerminalId(0b11110100)
|
||||
self.read_extended_id_mock.return_value = bytes.fromhex('c1 34 83 00')
|
||||
self.get_features_mock.return_value = { Feature.EAB: 7 }
|
||||
|
||||
# Act
|
||||
terminal = create_terminal(self.interface, None, self.get_keymap)
|
||||
|
||||
# Assert
|
||||
self.assertEqual(terminal.terminal_id.type, TerminalType.CUT)
|
||||
self.assertEqual(terminal.terminal_id.model, 2)
|
||||
self.assertEqual(terminal.terminal_id.keyboard, 15)
|
||||
self.assertEqual(terminal.extended_id, 'c1348300')
|
||||
self.assertEqual(terminal.display.dimensions, Dimensions(24, 80))
|
||||
self.assertEqual(terminal.features, { Feature.EAB: 7 })
|
||||
self.assertEqual(terminal.keyboard.keymap.name, '3278-2')
|
||||
|
||||
def test_unsupported_terminal_type(self):
|
||||
# Arrange
|
||||
self.read_terminal_id_mock.return_value = TerminalId(0b00000001)
|
||||
|
||||
# Act and assert
|
||||
with self.assertRaises(UnsupportedTerminalError):
|
||||
create_terminal(self.interface, None, self.get_keymap)
|
||||
|
||||
def test_unsupported_terminal_model(self):
|
||||
# Arrange
|
||||
terminal_id = TerminalId(0b11110100)
|
||||
|
||||
terminal_id.model = 1
|
||||
|
||||
self.read_terminal_id_mock.return_value = terminal_id
|
||||
|
||||
# Act and assert
|
||||
with self.assertRaises(UnsupportedTerminalError):
|
||||
create_terminal(self.interface, None, self.get_keymap)
|
||||
|
||||
def test_eab_feature_removed_on_legacy_interface_without_strategy(self):
|
||||
# Arrange
|
||||
self.interface.legacy_firmware_detected = True
|
||||
|
||||
self.read_terminal_id_mock.return_value = TerminalId(0b11110100)
|
||||
self.read_extended_id_mock.return_value = bytes.fromhex('c1 34 83 00')
|
||||
self.get_features_mock.return_value = { Feature.EAB: 7 }
|
||||
|
||||
patcher = patch('oec.terminal._print_no_i1_eab_notice')
|
||||
|
||||
print_no_i1_eab_notice_mock = patcher.start()
|
||||
|
||||
# Act
|
||||
terminal = create_terminal(self.interface, None, self.get_keymap)
|
||||
|
||||
# Assert
|
||||
self.assertEqual(terminal.terminal_id.type, TerminalType.CUT)
|
||||
self.assertEqual(terminal.terminal_id.model, 2)
|
||||
self.assertEqual(terminal.terminal_id.keyboard, 15)
|
||||
self.assertEqual(terminal.extended_id, 'c1348300')
|
||||
self.assertEqual(terminal.display.dimensions, Dimensions(24, 80))
|
||||
self.assertEqual(terminal.features, { })
|
||||
self.assertEqual(terminal.keyboard.keymap.name, '3278-2')
|
||||
|
||||
print_no_i1_eab_notice_mock.assert_called_once()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user