From 6c92d95e6f77f1bd011f328f4380a454f7c72067 Mon Sep 17 00:00:00 2001 From: Andrew Kay Date: Sun, 21 Nov 2021 11:34:19 -0600 Subject: [PATCH] Add 3299 multiplexer support --- README.md | 7 +- oec/__main__.py | 8 +- oec/controller.py | 324 +++++++++++++++++--------- oec/device.py | 18 +- oec/interface.py | 8 +- oec/session.py | 3 + oec/terminal.py | 20 +- oec/tn3270.py | 3 +- oec/vt100.py | 20 +- requirements.txt | 2 +- tests/mock_interface.py | 6 +- tests/test_controller.py | 491 +++++++++++++++++++++++---------------- tests/test_device.py | 10 +- tests/test_terminal.py | 31 +-- tests/test_vt100.py | 4 + 15 files changed, 573 insertions(+), 382 deletions(-) diff --git a/README.md b/README.md index f87fa48..d763178 100644 --- a/README.md +++ b/README.md @@ -22,12 +22,15 @@ emulation. - [x] Basic TN3270E - [ ] SSL/TLS - [x] VT100 - - [ ] Connection Menu + - [ ] Connection menu - [ ] MLT (Multiple Logical Terminals) + - [x] Up to 8 terminals connected via IBM 3299 multiplexer1 + +1 - requires [interface2](https://github.com/lowobservable/coax/tree/master/interface2#readme) with recent firmware ## Supported Terminals -Only directly attached CUT (Control Unit Terminal) type terminals are supported. I have tested oec with the following terminals: +Only CUT (Control Unit Terminal) type terminals are supported. I have tested oec with the following terminals: * IBM 3278-2 * IBM 3472 diff --git a/oec/__main__.py b/oec/__main__.py index 8c697e3..1c98b94 100644 --- a/oec/__main__.py +++ b/oec/__main__.py @@ -23,7 +23,7 @@ from .keymap_3483 import KEYMAP as KEYMAP_3483 logging.basicConfig(level=logging.INFO) -logger = logging.getLogger('oec') +logger = logging.getLogger('oec.main') CONTROLLER = None @@ -76,7 +76,7 @@ def _create_session(args, device): def _signal_handler(number, frame): global CONTROLLER - print('Stopping controller...') + logger.info('Stopping controller...') if CONTROLLER: CONTROLLER.stop() @@ -115,11 +115,11 @@ def main(): create_device = lambda interface, device_address, poll_response: _create_device(args, interface, device_address, poll_response) create_session = lambda device: _create_session(args, device) + logger.info('Starting controller...') + with open_serial_interface(args.serial_port) as interface: CONTROLLER = Controller(InterfaceWrapper(interface), create_device, create_session) - print('Starting controller...') - CONTROLLER.run() if __name__ == '__main__': diff --git a/oec/controller.py b/oec/controller.py index 6b92ed5..00f104f 100644 --- a/oec/controller.py +++ b/oec/controller.py @@ -3,15 +3,27 @@ oec.controller ~~~~~~~~~~~~~~ """ +from enum import Enum import time import logging import selectors -from coax import Poll, PollAck, KeystrokePollResponse, ReceiveTimeout +from concurrent import futures +from itertools import groupby +from coax import InterfaceFeature, Poll, PollAck, KeystrokePollResponse, \ + ReceiveTimeout, ReceiveError, ProtocolError +from coax.multiplexer import PORT_MAP_3299 from .device import address_commands, format_address, UnsupportedDeviceError from .keyboard import Key from .session import SessionDisconnectedError +class SessionState(Enum): + """Session state.""" + + STARTING = 1 + ACTIVE = 2 + TERMINATING = 3 + class Controller: """The controller.""" @@ -24,148 +36,239 @@ class Controller: self.create_device = create_device self.create_session = create_session - self.device = None + self.devices = { } + self.detatched_device_poll_queue = [] - self.session = None + self.sessions = { } self.session_selector = None + self.session_executor = None # Target time between POLL commands in seconds when a device is attached or # no device is attached. - # - # The attached poll period only applies in cases where the device responded - # with TT/AR to the last poll - this is an effort to improve the keystroke - # responsiveness. - self.attached_poll_period = 1 / 10 - self.detatached_poll_period = 5 + self.attached_poll_period = 1 / 15 + self.detatched_poll_period = 1 / 2 - self.last_poll_time = None - self.last_poll_response = None + # Maximum number of POLL commands to execute, per attached device, per run + # loop iteration. If all attached devices respond with TT/AR the run loop + # iteration will exit without reaching this maximum depth. + # + # This is an effort to improve the keystroke responsiveness. + self.poll_depth = 3 + + self.last_attached_poll_time = None + self.last_detatched_poll_time = None def run(self): """Run the controller.""" self.running = True self.session_selector = selectors.DefaultSelector() + self.session_executor = futures.ThreadPoolExecutor() + + self.logger.info('Controller started') while self.running: self._run_loop() - self._terminate_session() + self.session_executor.shutdown(wait=True) + + self.session_executor = None + + for session in [session for (state, session) in self.sessions.values() if state == SessionState.ACTIVE]: + self._terminate_session(session, blocking=True) self.session_selector.close() self.session_selector = None - if self.device: - self.device = None + self.sessions.clear() + + self.devices.clear() + self.detatched_device_poll_queue.clear() + + self.logger.info('Controller stopped') def stop(self): """Stop the controller.""" self.running = False def _run_loop(self): - poll_delay = self._calculate_poll_delay(time.perf_counter()) + poll_delay = self._calculate_poll_delay() # If POLLing is delayed, handle the host output, otherwise just sleep. + start_time = time.perf_counter() + if poll_delay > 0: - if self.session: - self._update_session(poll_delay) - else: - time.sleep(poll_delay) + self._update_sessions(poll_delay) + + poll_delay -= (time.perf_counter() - start_time) + + if poll_delay > 0: + time.sleep(poll_delay) # POLL devices. - self._poll_attached_device() - self._poll_detatched_device() + self._poll_attached_devices() + self._poll_next_detatched_device() - def _update_session(self, duration): - try: - update_count = 0 + def _update_sessions(self, duration): + start_time = time.perf_counter() - while duration > 0: - start_time = time.perf_counter() + # Start any missing sessions. + for device_address in self.devices.keys() - self.sessions.keys(): + self._start_session(self.devices[device_address]) - selected = self.session_selector.select(duration) + sessions = { state: [(device_address, session) for (device_address, (_, session)) in group] for (state, group) in groupby(self.sessions.items(), lambda item: item[1][0]) } - if not selected: - break + # Handle started sessions. + for (device_address, future) in sessions.get(SessionState.STARTING, []): + if future.done(): + session = future.result() - for (key, _) in selected: - session = key.fileobj + self.sessions[device_address] = (SessionState.ACTIVE, session) + self.session_selector.register(session, selectors.EVENT_READ) + + self.logger.info(f'Session started for device @ {format_address(self.interface, device_address)}') + + # Handle terminated sessions. + for (device_address, future) in sessions.get(SessionState.TERMINATING, []): + if future.done(): + del self.sessions[device_address] + + self.logger.info(f'Session terminated for device @ {format_address(self.interface, device_address)}') + + # Update the duration based on the time taken handling futures. + duration -= (time.perf_counter() - start_time) + + # Update active sessions. + updated_sessions = set() + + while duration > 0: + start_time = time.perf_counter() + + selected = self.session_selector.select(duration) + + if not selected: + break + + for (key, _) in selected: + session = key.fileobj + + try: if session.handle_host(): - update_count += 1 + updated_sessions.add(session) + except SessionDisconnectedError: + updated_sessions.discard(session) - duration -= (time.perf_counter() - start_time) + self._handle_session_disconnected(session) - if update_count > 0: - self.session.render() - except SessionDisconnectedError: - self._handle_session_disconnected() + duration -= (time.perf_counter() - start_time) - def _start_session(self): - self.session = self.create_session(self.device) + for session in updated_sessions: + session.render() - self.session.start() + def _start_session(self, device): + device_address = device.device_address - self.session_selector.register(self.session, selectors.EVENT_READ) + self.logger.info(f'Starting session for device @ {format_address(self.interface, device_address)}') - def _terminate_session(self): - if not self.session: - return + def start_session(): + session = self.create_session(device) - self.session_selector.unregister(self.session) + session.start() - self.session.terminate() + return session - self.session = None + future = self.session_executor.submit(start_session) - def _handle_session_disconnected(self): + self.sessions[device_address] = (SessionState.STARTING, future) + + def _terminate_session(self, session, blocking=False): + device_address = session.terminal.device_address + + self.logger.info(f'Terminating session for device @ {format_address(self.interface, device_address)}') + + self.session_selector.unregister(session) + + def terminate_session(): + session.terminate() + + if blocking: + terminate_session() + + del self.sessions[device_address] + else: + future = self.session_executor.submit(terminate_session) + + self.sessions[device_address] = (SessionState.TERMINATING, future) + + def _handle_session_disconnected(self, session): self.logger.info('Session disconnected') - self._terminate_session() + self._terminate_session(session) - # Restart the session. - self._start_session() + def _poll_attached_devices(self): + self.last_attached_poll_time = time.perf_counter() - def _poll_attached_device(self): - if not self.device: + for _ in range(self.poll_depth): + devices = self.devices.values() + + if not devices: + break + + poll_commands = [address_commands(device.device_address, Poll(device.get_poll_action())) for device in devices] + + poll_responses = list(zip(devices, self.interface.execute(poll_commands, receive_timeout_is_error=False))) + + # Handle POLL responses. + handleable_poll_responses = [pair for pair in poll_responses if pair[1] is not None and not isinstance(pair[1], ReceiveTimeout)] + + if handleable_poll_responses: + poll_ack_commands = [address_commands(device.device_address, PollAck()) for (device, _) in handleable_poll_responses] + + self.interface.execute(poll_ack_commands) + + for (device, poll_response) in handleable_poll_responses: + self._handle_poll_response(device, poll_response) + + # Handle lost devices. + for (device, poll_response) in poll_responses: + if isinstance(poll_response, ReceiveTimeout): + self._handle_device_lost(device) + + if not handleable_poll_responses: + break + + def _poll_next_detatched_device(self): + if self.last_detatched_poll_time is not None and (time.perf_counter() - self.last_detatched_poll_time) < self.detatched_poll_period: return - self.last_poll_time = time.perf_counter() + self.last_detatched_poll_time = time.perf_counter() + + if not self.detatched_device_poll_queue: + self.detatched_device_poll_queue = list(self._get_detatched_device_addresses()) try: - poll_response = self.device.poll() + device_address = self.detatched_device_poll_queue.pop(0) + except IndexError: + return + + try: + poll_response = self.interface.execute(address_commands(device_address, Poll())) except ReceiveTimeout: - self._handle_device_lost() + return + except ReceiveError as error: + self.logger.warning(f'POLL detatched device @ {format_address(self.interface, device_address)} receive error: {error}') + return + except ProtocolError as error: + self.logger.warning(f'POLL detatched device @ {format_address(self.interface, device_address)} protocol error: {error}') return if poll_response: - self._poll_ack(self.device.device_address) - - self._handle_poll_response(poll_response) - - self.last_poll_response = poll_response - - def _poll_detatched_device(self): - if self.device: - return - - self.last_poll_time = time.perf_counter() - - device_address = None - - try: - poll_response = self._poll(device_address) - except ReceiveTimeout: - return - - if poll_response: - self._poll_ack(device_address) + self.interface.execute(address_commands(device_address, PollAck())) self._handle_device_found(device_address, poll_response) - self.last_poll_response = poll_response - def _handle_device_found(self, device_address, poll_response): self.logger.info(f'Found device @ {format_address(self.interface, device_address)}') @@ -177,29 +280,31 @@ class Controller: device.setup() - self.device = device + self.devices[device_address] = device self.logger.info(f'Attached device @ {format_address(self.interface, device_address)}') - self._start_session() - - def _handle_device_lost(self): - device_address = self.device.device_address + def _handle_device_lost(self, device): + device_address = device.device_address self.logger.info(f'Lost device @ {format_address(self.interface, device_address)}') - self._terminate_session() + if device_address in self.sessions: + (session_state, session) = self.sessions[device_address] - self.device = None + if session_state == SessionState.ACTIVE: + self._terminate_session(session) + + del self.devices[device_address] self.logger.info(f'Detached device @ {format_address(self.interface, device_address)}') - def _handle_poll_response(self, poll_response): + def _handle_poll_response(self, device, poll_response): if isinstance(poll_response, KeystrokePollResponse): - self._handle_keystroke_poll_response(poll_response) + self._handle_keystroke_poll_response(device, poll_response) - def _handle_keystroke_poll_response(self, poll_response): - terminal = self.device + def _handle_keystroke_poll_response(self, terminal, poll_response): + device_address = terminal.device_address scan_code = poll_response.scan_code (key, modifiers, modifiers_changed) = terminal.keyboard.get_key(scan_code) @@ -221,27 +326,34 @@ class Controller: terminal.display.toggle_cursor_reverse() elif key == Key.CLICKER: terminal.keyboard.toggle_clicker() - elif self.session: - self.session.handle_key(key, modifiers, scan_code) + elif device_address in self.sessions: + (session_state, session) = self.sessions[device_address] - self.session.render() + if session_state == SessionState.ACTIVE: + session.handle_key(key, modifiers, scan_code) - def _poll(self, device_address): - return self.interface.execute(address_commands(device_address, Poll())) + session.render() - def _poll_ack(self, device_address): - self.interface.execute(address_commands(device_address, PollAck())) - - def _calculate_poll_delay(self, current_time): - if self.last_poll_response is not None: + def _calculate_poll_delay(self): + if self.last_attached_poll_time is None: return 0 - if self.last_poll_time is None: - return 0 + return max((self.last_attached_poll_time + self.attached_poll_period) - time.perf_counter(), 0) - if self.device: - period = self.attached_poll_period + def _get_detatched_device_addresses(self): + attached_addresses = set(self.devices.keys()) + + # The 3299 is transparent, but if there is at least one device attached to a 3299 + # port then we can assume there is a 3299 attached and if there is one device + # direct attached then we can assume there is not a 3299 attached. + is_3299_attached = any(attached_addresses.difference([None])) + is_3299_not_attached = (None in attached_addresses) + + if is_3299_not_attached or InterfaceFeature.PROTOCOL_3299 not in self.interface.features: + addresses = [None] + elif is_3299_attached: + addresses = PORT_MAP_3299 else: - period = self.detatached_poll_period + addresses = [None, *PORT_MAP_3299] - return max((self.last_poll_time + period) - current_time, 0) + return filter(lambda address: address not in attached_addresses, addresses) diff --git a/oec/device.py b/oec/device.py index cd3a437..79c944c 100644 --- a/oec/device.py +++ b/oec/device.py @@ -9,6 +9,7 @@ import logging from more_itertools import chunked from coax import read_feature_ids, parse_features, ReadTerminalId, ReadExtendedId, \ Feature, ProtocolError +from coax.multiplexer import PORT_MAP_3299 logger = logging.getLogger(__name__) @@ -23,8 +24,8 @@ class Device: """Setup the device.""" raise NotImplementedError - def poll(self): - """POLL the device.""" + def get_poll_action(self): + """Get the POLL action.""" raise NotImplementedError def execute(self, commands): @@ -67,9 +68,12 @@ def address_commands(device_address, commands): def format_address(interface, device_address): """Format a device address.""" if device_address is None: - return interface.identifier + return f'{interface.identifier}#0' - raise NotImplementedError + try: + return f'{interface.identifier}#{PORT_MAP_3299.index(device_address)}' + except ValueError: + return f'{interface.identifier}?{device_address:06b}' def get_ids(interface, device_address, extended_id_retry_attempts=3): terminal_id = None @@ -78,7 +82,7 @@ def get_ids(interface, device_address, extended_id_retry_attempts=3): try: terminal_id = interface.execute(address_commands(device_address, ReadTerminalId())) except ProtocolError as error: - logger.warning(f'READ_TERMINAL_ID protocol error: {error}', exc_info=error) + logger.warning(f'READ_TERMINAL_ID protocol error: {error}') # Retry the READ_EXTENDED_ID command as it appears to fail frequently on the # first request - unlike the READ_TERMINAL_ID command, @@ -90,9 +94,9 @@ def get_ids(interface, device_address, extended_id_retry_attempts=3): break except ProtocolError as error: - logger.warning(f'READ_EXTENDED_ID protocol error: {error}', exc_info=error) + logger.warning(f'READ_EXTENDED_ID protocol error: {error}') - time.sleep(0.25) + time.sleep(0.1) return (terminal_id, extended_id.hex() if extended_id is not None else None) diff --git a/oec/interface.py b/oec/interface.py index ffdbe36..b9b9161 100644 --- a/oec/interface.py +++ b/oec/interface.py @@ -7,6 +7,8 @@ import os import logging from textwrap import dedent +from coax import ReceiveTimeout + logger = logging.getLogger(__name__) class AggregateExecuteError(Exception): @@ -20,7 +22,7 @@ class InterfaceWrapper: def __init__(self, interface): self.interface = interface - self.timeout = 0.1 + self.timeout = 0.001 self.jumbo_write_strategy = _get_jumbo_write_strategy() self.jumbo_write_max_length = None @@ -37,13 +39,13 @@ class InterfaceWrapper: return getattr(self.interface, attr) - def execute(self, commands): + def execute(self, commands, receive_timeout_is_error=True): if not isinstance(commands, list): return self.interface.execute(commands, self.timeout) responses = self.interface.execute(commands, self.timeout) - errors = [response for response in responses if isinstance(response, BaseException)] + errors = [response for response in responses if isinstance(response, BaseException) and (receive_timeout_is_error or not isinstance(response, ReceiveTimeout))] if any(errors): raise AggregateExecuteError(errors, responses) diff --git a/oec/session.py b/oec/session.py index fe00501..a876f32 100644 --- a/oec/session.py +++ b/oec/session.py @@ -1,4 +1,7 @@ class Session: + def __init__(self, terminal): + self.terminal = terminal + def start(self): raise NotImplementedError diff --git a/oec/terminal.py b/oec/terminal.py index 1928944..b9c540a 100644 --- a/oec/terminal.py +++ b/oec/terminal.py @@ -3,7 +3,7 @@ oec.terminal ~~~~~~~~~~~~ """ -from coax import Poll, LoadControlRegister, Feature, PollAction, Control +from coax import LoadControlRegister, Feature, PollAction, Control from .device import Device, UnsupportedDeviceError from .display import Dimensions, BufferedDisplay @@ -53,30 +53,26 @@ class Terminal(Device): # Show the attached indicator on the status line. self.display.status_line.write_string(0, 'OEC') - def poll(self): - """Execute a POLL command with queued actions.""" + self.display.move_cursor(row=0, column=0) + + def get_poll_action(self): + """Get the POLL action.""" poll_action = PollAction.NONE # Convert a queued alarm or keyboard clicker change to POLL action. if self.alarm: poll_action = PollAction.ALARM + + self.alarm = False elif self.keyboard.clicker != self.last_poll_keyboard_clicker: if self.keyboard.clicker: poll_action = PollAction.ENABLE_KEYBOARD_CLICKER else: poll_action = PollAction.DISABLE_KEYBOARD_CLICKER - poll_response = self.execute(Poll(poll_action)) - - # Clear the queued alarm and keyboard clicker change if the POLL was - # successful. - if poll_action == PollAction.ALARM: - self.alarm = False - elif poll_action in [PollAction.ENABLE_KEYBOARD_CLICKER, - PollAction.DISABLE_KEYBOARD_CLICKER]: self.last_poll_keyboard_clicker = self.keyboard.clicker - return poll_response + return poll_action def sound_alarm(self): """Queue an alarm on next POLL command.""" diff --git a/oec/tn3270.py b/oec/tn3270.py index 4fc8ccc..ee6116f 100644 --- a/oec/tn3270.py +++ b/oec/tn3270.py @@ -48,9 +48,10 @@ class TN3270Session(Session): """TN3270 session.""" def __init__(self, terminal, host, port): + super().__init__(terminal) + self.logger = logging.getLogger(__name__) - self.terminal = terminal self.host = host self.port = port diff --git a/oec/vt100.py b/oec/vt100.py index 6c93ebc..01c577e 100644 --- a/oec/vt100.py +++ b/oec/vt100.py @@ -71,9 +71,10 @@ class VT100Session(Session): """VT100 session.""" def __init__(self, terminal, host_command): + super().__init__(terminal) + self.logger = logging.getLogger(__name__) - self.terminal = terminal self.host_command = host_command self.host_process = None @@ -94,19 +95,11 @@ class VT100Session(Session): self.vt100_stream = pyte.ByteStream(self.vt100_screen) + self.is_first_render = True + def start(self): - # Start the host process. self._start_host_process() - # Clear the screen. - self.terminal.display.clear() - - # Update the status line. - self.terminal.display.status_line.write_string(45, 'VT100') - - # Reset the cursor. - self.terminal.display.move_cursor(row=0, column=0) - def terminate(self): if self.host_process: self._terminate_host_process() @@ -137,6 +130,11 @@ class VT100Session(Session): self.host_process.write(bytes_) def render(self): + if self.is_first_render: + self.terminal.display.status_line.write_string(45, 'VT100') + + self.is_first_render = False + self._apply() self._flush() diff --git a/requirements.txt b/requirements.txt index 8f29b43..550f33c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ more-itertools==8.10.0 ptyprocess==0.7.0 -pycoax==0.10.0 +pycoax==0.11.0 pyserial==3.5 pyte==0.8.0 pytn3270==0.12.0 diff --git a/tests/mock_interface.py b/tests/mock_interface.py index 178e9f8..35470dc 100644 --- a/tests/mock_interface.py +++ b/tests/mock_interface.py @@ -1,10 +1,12 @@ -from unittest.mock import Mock +from unittest.mock import Mock, ANY from coax import ProtocolError, ReceiveError, ReceiveTimeout from coax.interface import Interface class MockInterface(Interface): def __init__(self, responses=[]): + super().__init__() + self.mock_responses = responses self.serial = Mock(port='/dev/mock') @@ -40,7 +42,7 @@ class MockInterface(Interface): for command in call[0][0]: (call_device_address, call_command) = command - if call_device_address == device_address and isinstance(call_command, command_type): + if (device_address == ANY or call_device_address == device_address) and isinstance(call_command, command_type): if predicate is None or predicate(call_command): commands.append(command) diff --git a/tests/test_controller.py b/tests/test_controller.py index dcef935..776b602 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -1,265 +1,344 @@ import unittest -from unittest.mock import Mock, create_autospec, patch +from unittest.mock import Mock, create_autospec, patch, call, ANY import selectors from selectors import BaseSelector -from logging import Logger -from coax import Poll, PowerOnResetCompletePollResponse, KeystrokePollResponse, PollAck, ReceiveTimeout -from coax.protocol import TerminalId +from concurrent.futures import Future +from coax import Poll, PollAck, PowerOnResetCompletePollResponse, KeystrokePollResponse, ReceiveTimeout import context from oec.interface import InterfaceWrapper -from oec.controller import Controller -from oec.device import UnsupportedDeviceError +from oec.controller import Controller, SessionState from oec.terminal import Terminal -from oec.keyboard import KeyboardModifiers, Key -from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2 from oec.session import Session, SessionDisconnectedError from mock_interface import MockInterface -class RunLoopTestCase(unittest.TestCase): +class UpdateSessionsTestCase(unittest.TestCase): def setUp(self): self.interface = MockInterface() - self.terminal = Terminal(self.interface, None, TerminalId(0b11110100), 'c1348300', { }, KEYMAP_3278_2) - - self.terminal.setup = Mock(Terminal, instance=True) - - self.terminal.display.write = Mock() - self.terminal.display.toggle_cursor_blink = Mock() - self.terminal.display.toggle_cursor_reverse = Mock() - - self.terminal.keyboard.toggle_clicker = Mock() - - self.create_device = Mock(return_value=self.terminal) - - self.session = create_autospec(Session, instance=True) - self.create_session = Mock(return_value=self.session) - - self.controller = Controller(InterfaceWrapper(self.interface), self.create_device, self.create_session) - - self.controller.logger = create_autospec(Logger, instance=True) - - self.controller.attached_poll_period = 1 + self.controller = Controller(InterfaceWrapper(self.interface), None, None) self.controller.session_selector = create_autospec(BaseSelector, instance=True) - self.controller._update_session = Mock(wraps=self.controller._update_session) - patcher = patch('oec.controller.time.perf_counter') self.perf_counter = patcher.start() - patcher = patch('oec.controller.time.sleep') - - self.sleep = patcher.start() - self.addCleanup(patch.stopall) - def test_no_device(self): - self._assert_run_loop(0, ReceiveTimeout, False, 0, False) - self._assert_run_loop(1, ReceiveTimeout, False, 4, False) - - self.assertIsNone(self.controller.device) - self.assertIsNone(self.controller.session) - - def test_device_attached(self): - self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) - self._assert_run_loop(0, None, False, 0, False) - self._assert_run_loop(0.5, None, True, 0.5, False) - - self.assertIsNotNone(self.controller.device) - self.assertIsNotNone(self.controller.session) - - self.controller._update_session.assert_called() - - def test_unsupported_device_attached(self): - self.create_device.side_effect = [UnsupportedDeviceError] - - self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) - - self.assertIsNone(self.controller.device) - self.assertIsNone(self.controller.session) - - def test_keystroke(self): - self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) - self._assert_run_loop(0, KeystrokePollResponse(0b0110000010), False, 0, True) - self._assert_run_loop(0, None, False, 0, False) - self._assert_run_loop(0.5, None, True, 0.5, False) - - self.assertIsNotNone(self.controller.device) - self.assertIsNotNone(self.controller.session) - - self.controller.session.handle_key.assert_called_with(Key.LOWER_A, KeyboardModifiers.NONE, 96) - - def test_device_detatched(self): - self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) - self._assert_run_loop(0, None, False, 0, False) - self._assert_run_loop(0.5, ReceiveTimeout, True, 0.5, False) - - self.assertIsNone(self.controller.device) - self.assertIsNone(self.controller.session) - - self.session.terminate.assert_called() - - def test_session_disconnected(self): - selector_key = Mock(fileobj=self.session) - - self.controller.session_selector.select.return_value = [(selector_key, selectors.EVENT_READ)] - - self.session.handle_host = Mock(side_effect=[None, SessionDisconnectedError]) - - self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) - self._assert_run_loop(0, None, False, 0, False) - self._assert_run_loop(0.5, None, True, 0.5, False) - self._assert_run_loop(1.5, None, True, 3, False) - - self.assertIsNotNone(self.controller.device) - self.assertIsNotNone(self.controller.session) - - self.assertEqual(self.create_session.call_count, 2) - - def test_toggle_cursor_blink(self): - self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) - - self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True) - - self.terminal.display.toggle_cursor_blink.assert_called_once() - - self.terminal.display.toggle_cursor_blink.reset_mock() - - self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True) - - self.terminal.display.toggle_cursor_blink.assert_called_once() - - def test_toggle_cursor_reverse(self): - self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) - - self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True) - self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True) - self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True) - - self.terminal.display.toggle_cursor_reverse.assert_called_once() - - self.terminal.display.toggle_cursor_reverse.reset_mock() - - self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True) - self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True) - self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True) - - self.terminal.display.toggle_cursor_reverse.assert_called_once() - - def test_toggle_clicker(self): - self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True) - - self._assert_run_loop(0, KeystrokePollResponse(0b0101011110), False, 0, True) - self._assert_run_loop(0, None, False, 0, False) - - self.terminal.keyboard.toggle_clicker.assert_called_once() - - self.terminal.keyboard.toggle_clicker.reset_mock() - - self._assert_run_loop(0.5, KeystrokePollResponse(0b0101011110), True, 0.5, True) - self._assert_run_loop(1, None, False, 0, False) - - self.terminal.keyboard.toggle_clicker.assert_called_once() - - def _assert_run_loop(self, poll_time, poll_response, expected_update_session, expected_poll_delay, expected_poll_ack): + def test_no_sessions(self): # Arrange - self.controller._update_session.reset_mock() - self.interface.mock_responses = [(None, Poll, None, poll_response)] + self.perf_counter.side_effect = [0, 0.1, 0.2] - self.interface.reset_mock() - - def perf_counter(): - nonlocal poll_time - - time = poll_time - - poll_time += expected_update_session - - return time - - self.perf_counter.side_effect = perf_counter - - self.sleep.reset_mock() + self.controller.session_selector.select.return_value = [] # Act - self.controller._run_loop() + self.assertFalse(self.controller._update_sessions(1.0)) + + def test_missing_sessions_are_started(self): + # Arrange + device = create_autospec(Terminal, instance=True) + + self.controller.devices[None] = device + + self.controller._start_session = Mock() + + self.perf_counter.side_effect = [0, 0.1, 0.2] + + self.controller.session_selector.select.return_value = [] + + # Act + self.controller._update_sessions(1.0) # Assert - if expected_update_session: - self.controller._update_session.assert_called_once_with(expected_poll_delay) - self.sleep.assert_not_called() - else: - self.controller._update_session.assert_not_called() + self.controller._start_session.assert_called_once_with(device) - if expected_poll_delay > 0: - self.sleep.assert_called_once_with(expected_poll_delay) - else: - self.sleep.assert_not_called() + def test_started_sessions_are_activated(self): + # Arrange + device = create_autospec(Terminal, instance=True) - if expected_poll_ack: - self.interface.assert_command_executed(None, PollAck) - else: - self.interface.assert_command_not_executed(None, PollAck) + session = create_autospec(Session, instance=True) -class UpdateSessionTestCase(unittest.TestCase): + future = create_autospec(Future, instance=True) + + future.done = Mock(return_value=True) + future.result = Mock(return_value=session) + + self.controller.devices[None] = device + self.controller.sessions[None] = (SessionState.STARTING, future) + + self.controller.session_selector.select.return_value = [] + + self.perf_counter.side_effect = [0, 0.1, 0.2] + + # Act + self.controller._update_sessions(1.0) + + # Assert + self.assertEqual(self.controller.sessions, { None: (SessionState.ACTIVE, session) }) + + self.controller.session_selector.register.assert_called_once_with(session, selectors.EVENT_READ) + + def test_terminated_sessions_are_removed(self): + # Arrange + device = create_autospec(Terminal, instance=True) + + future = create_autospec(Future, instance=True) + + future.done = Mock(return_value=True) + future.result = Mock() + + self.controller.devices[None] = device + self.controller.sessions[None] = (SessionState.TERMINATING, future) + + self.controller.session_selector.select.return_value = [] + + self.perf_counter.side_effect = [0, 0.1, 0.2] + + # Act + self.controller._update_sessions(1.0) + + # Assert + self.assertEqual(self.controller.sessions, { }) + + def test_active_sessions_select_timeout(self): + # Arrange + device = create_autospec(Terminal, instance=True) + + session = create_autospec(Session, instance=True) + + self.controller.devices[None] = device + self.controller.sessions[None] = (SessionState.ACTIVE, session) + + self.controller.session_selector.select.return_value = [] + + self.perf_counter.side_effect = [0, 0.1, 0.2] + + # Act + self.controller._update_sessions(1.0) + + # Assert + self.controller.session_selector.select.assert_called_once_with(0.9) + + session.handle_host.assert_not_called() + session.render.assert_not_called() + + def test_active_sessions_select_available(self): + # Arrange + device = create_autospec(Terminal, instance=True) + + session = create_autospec(Session, instance=True) + + self.controller.devices[None] = device + self.controller.sessions[None] = (SessionState.ACTIVE, session) + + selector_key = Mock(fileobj=session) + + self.controller.session_selector.select.side_effect = [[(selector_key, selectors.EVENT_READ)], []] + + self.perf_counter.side_effect = [0, 0.1, 0.2, 0.3, 0.4, 0.4] + + # Act + self.controller._update_sessions(1.0) + + # Assert + self.controller.session_selector.select.assert_has_calls([call(0.9), call(0.8)]) + + session.handle_host.assert_called_once() + session.render.assert_called_once() + + def test_active_sessions_disconnected(self): + # Arrange + device = create_autospec(Terminal, instance=True) + + session = create_autospec(Session, instance=True) + + session.handle_host.side_effect = SessionDisconnectedError + + self.controller.devices[None] = device + self.controller.sessions[None] = (SessionState.ACTIVE, session) + + self.controller._terminate_session = Mock() + + selector_key = Mock(fileobj=session) + + self.controller.session_selector.select.side_effect = [[(selector_key, selectors.EVENT_READ)], []] + + self.perf_counter.side_effect = [0, 0.1, 0.2, 0.3, 0.4, 0.4] + + # Act + self.controller._update_sessions(1.0) + + # Assert + self.controller.session_selector.select.assert_has_calls([call(0.9), call(0.8)]) + + self.controller._terminate_session.assert_called_once_with(session) + + session.render.assert_not_called() + +class PollAttachedDevicesTestCase(unittest.TestCase): def setUp(self): - self.controller = Controller(None, None, None) + self.interface = MockInterface() - self.controller.session = create_autospec(Session, instance=True) + self.controller = Controller(InterfaceWrapper(self.interface), None, None) - self.controller.session_selector = create_autospec(BaseSelector, instance=True) + self.controller._handle_poll_response = Mock(wraps=self.controller._handle_poll_response) + + def test_no_attached_devices(self): + self.controller._poll_attached_devices() + + # Assert + self.interface.assert_command_not_executed(ANY, Poll) + self.controller._handle_poll_response.assert_not_called() + + def test_tt_ar(self): + # Arrange + device = create_autospec(Terminal, instance=True, device_address=None) + + self.controller.devices[None] = device + + # Act + self.controller._poll_attached_devices() + + # Assert + self.interface.assert_command_executed(None, Poll) + self.interface.assert_command_not_executed(None, PollAck) + self.controller._handle_poll_response.assert_not_called() + + def test_receive_timeout(self): + # Arrange + self.interface.mock_responses = [(None, Poll, None, ReceiveTimeout)] + + device = create_autospec(Terminal, instance=True, device_address=None) + + self.controller.devices[None] = device + + self.controller._handle_device_lost = Mock() + + # Act + self.controller._poll_attached_devices() + + # Assert + self.controller._handle_device_lost.assert_called_once_with(device) + + self.interface.assert_command_executed(None, Poll) + self.interface.assert_command_not_executed(None, PollAck) + self.controller._handle_poll_response.assert_not_called() + + def test_keystroke(self): + # Arrange + poll_response = KeystrokePollResponse(0b0110000010) + + poll = Mock(side_effect=[poll_response, None, None]) + + self.interface.mock_responses = [(None, Poll, None, poll)] + + device = create_autospec(Terminal, instance=True, device_address=None) + + self.controller.devices[None] = device + + self.controller._handle_keystroke_poll_response = Mock() + + # Act + self.controller._poll_attached_devices() + + # Assert + self.controller._handle_keystroke_poll_response.assert_called_once_with(device, poll_response) + + self.assertEqual(poll.call_count, 2) + self.interface.assert_command_executed(None, PollAck) + +class PollNextDetatchedDeviceTestCase(unittest.TestCase): + def setUp(self): + self.interface = MockInterface() + + self.interface.mock_responses = [(None, Poll, None, ReceiveTimeout)] + + self.controller = Controller(InterfaceWrapper(self.interface), None, None) patcher = patch('oec.controller.time.perf_counter') self.perf_counter = patcher.start() - def test_zero_duration(self): - # Act - self.controller._update_session(0) + self.addCleanup(patch.stopall) - # Assert - self.controller.session.handle_host.assert_not_called() - self.controller.session.render.assert_not_called() - - self.controller.session_selector.select.assert_not_called() - - def test_select_timeout(self): + def test_poll_period_not_expired(self): # Arrange - self.controller.session_selector.select.return_value = [] + self.controller.detatched_poll_period = 0.5 + self.controller.last_detatched_poll_time = 1.0 + + self.perf_counter.return_value = 1.1 # Act - self.controller._update_session(1) + self.controller._poll_next_detatched_device() # Assert - self.controller.session.handle_host.assert_not_called() - self.controller.session.render.assert_not_called() + self.interface.assert_command_not_executed(None, Poll) - self.controller.session_selector.select.assert_called_once() + self.assertEqual(self.controller.last_detatched_poll_time, 1.0) - def test_select_available(self): + def test_empty_queue_that_remains_empty(self): # Arrange - self.perf_counter.side_effect = [0, 0.75, 0.75] - - selector_key = Mock(fileobj=self.controller.session) - - self.controller.session_selector.select.side_effect = [[(selector_key, selectors.EVENT_READ)], []] + self.controller._get_detatched_device_addresses = Mock(return_value=[]) # Act - self.controller._update_session(1) + self.controller._poll_next_detatched_device() # Assert - self.controller.session.handle_host.assert_called_once() - self.controller.session.render.assert_called_once() + self.interface.assert_command_not_executed(None, Poll) - self.assertEqual(self.controller.session_selector.select.call_count, 2) + self.controller._get_detatched_device_addresses.assert_called_once() - call_args_list = self.controller.session_selector.select.call_args_list + def test_empty_queue_that_is_populated(self): + # Arrange + self.controller._get_detatched_device_addresses = Mock(return_value=[None]) - self.assertEqual(call_args_list[0][0][0], 1) - self.assertEqual(call_args_list[1][0][0], 0.25) + # Act + self.controller._poll_next_detatched_device() + + # Assert + self.interface.assert_command_executed(None, Poll) + + self.controller._get_detatched_device_addresses.assert_called_once() + + def test_non_empty_queue(self): + # Arrange + self.interface.mock_responses = [(0b000000, Poll, None, ReceiveTimeout)] + + self.controller.detatched_device_poll_queue = [0b000000, 0b100000] + + self.controller._get_detatched_device_addresses = Mock() + + # Act + self.controller._poll_next_detatched_device() + + # Assert + self.interface.assert_command_executed(0b000000, Poll) + + self.assertEqual(self.controller.detatched_device_poll_queue, [0b100000]) + + self.controller._get_detatched_device_addresses.assert_not_called() + + def test_device_found(self): + # Arrange + self.controller.detatched_device_poll_queue = [None] + + poll_response = PowerOnResetCompletePollResponse(0xa) + + poll = Mock(side_effect=[poll_response, None, None]) + + self.interface.mock_responses = [(None, Poll, None, poll)] + + self.controller._handle_device_found = Mock() + + # Act + self.controller._poll_next_detatched_device() + + # Assert + self.controller._handle_device_found.assert_called_once_with(None, poll_response) + + self.interface.assert_command_executed(None, PollAck) diff --git a/tests/test_device.py b/tests/test_device.py index 74f4453..7bc7964 100644 --- a/tests/test_device.py +++ b/tests/test_device.py @@ -38,11 +38,13 @@ class FormatAddressTestCase(unittest.TestCase): self.interface = MockInterface() def test_no_port(self): - self.assertEqual(format_address(InterfaceWrapper(self.interface), None), '/dev/mock') + self.assertEqual(format_address(InterfaceWrapper(self.interface), None), '/dev/mock#0') - def test_multiplexer(self): - with self.assertRaises(NotImplementedError): - format_address(InterfaceWrapper(self.interface), 0b110000) + def test_known_multiplexer_port(self): + self.assertEqual(format_address(InterfaceWrapper(self.interface), 0b110000), '/dev/mock#3') + + def test_unknown_multiplexer_port(self): + self.assertEqual(format_address(InterfaceWrapper(self.interface), 0b111111), '/dev/mock?111111') class GetIdsTestCase(unittest.TestCase): def setUp(self): diff --git a/tests/test_terminal.py b/tests/test_terminal.py index dce62ca..fe5cbc8 100644 --- a/tests/test_terminal.py +++ b/tests/test_terminal.py @@ -1,6 +1,6 @@ import unittest from unittest.mock import create_autospec -from coax import Poll, PollAction +from coax import PollAction from coax.protocol import TerminalId import context @@ -43,7 +43,7 @@ class TerminalSetupTestCase(unittest.TestCase): def test(self): self.terminal.setup() -class TerminalPollTestCase(unittest.TestCase): +class TerminalGetPollActionTestCase(unittest.TestCase): def setUp(self): self.interface = MockInterface() @@ -54,26 +54,17 @@ class TerminalPollTestCase(unittest.TestCase): # The terminal will be initialized in a state where the terminal keyboard clicker # state is unknown, and this cannot be read. Therefore the first POLL will always # attempt to set the keyboard clicker state... - self.terminal.poll() - - self.interface.reset_mock() + self.terminal.get_poll_action() def test_with_no_queued_actions(self): - # Act - self.terminal.poll() - - # Assert - self.assert_poll_with_poll_action(PollAction.NONE) + self.assertEqual(self.terminal.get_poll_action(), PollAction.NONE) def test_with_sound_alarm_queued(self): # Arrange self.terminal.sound_alarm() - # Act - self.terminal.poll() - - # Assert - self.assert_poll_with_poll_action(PollAction.ALARM) + # Act and assert + self.assertEqual(self.terminal.get_poll_action(), PollAction.ALARM) def test_with_enable_keyboard_clicker_queued(self): # Arrange @@ -81,14 +72,8 @@ class TerminalPollTestCase(unittest.TestCase): self.terminal.keyboard.toggle_clicker() - # Act - self.terminal.poll() - - # Assert - self.assert_poll_with_poll_action(PollAction.ENABLE_KEYBOARD_CLICKER) - - def assert_poll_with_poll_action(self, action): - self.interface.assert_command_executed(None, Poll, lambda command: command.action == action) + # Act and assert + self.assertEqual(self.terminal.get_poll_action(), PollAction.ENABLE_KEYBOARD_CLICKER) def _create_terminal(interface): terminal_id = TerminalId(0b11110100) diff --git a/tests/test_vt100.py b/tests/test_vt100.py index 1a74789..3016b12 100644 --- a/tests/test_vt100.py +++ b/tests/test_vt100.py @@ -131,6 +131,8 @@ class SessionRenderTestCase(unittest.TestCase): def test_with_no_eab_feature(self): # Arrange + self.session.is_first_render = False + self.session.host_process.read = Mock(return_value=b'abc') self.session.handle_host() @@ -157,6 +159,8 @@ class SessionRenderTestCase(unittest.TestCase): self.terminal.display.move_cursor = Mock(wraps=self.terminal.display.move_cursor) self.terminal.display.flush = Mock(wraps=self.terminal.display.flush) + self.session.is_first_render = False + self.session.host_process.read = Mock(return_value=b'abc') self.session.handle_host()