Add 3299 multiplexer support

This commit is contained in:
Andrew Kay
2021-11-21 11:34:19 -06:00
parent 16766d7147
commit 6c92d95e6f
15 changed files with 573 additions and 382 deletions

View File

@@ -22,12 +22,15 @@ emulation.
- [x] Basic TN3270E
- [ ] SSL/TLS
- [x] VT100
- [ ] Connection Menu
- [ ] Connection menu
- [ ] MLT (Multiple Logical Terminals)
- [x] Up to 8 terminals connected via IBM 3299 multiplexer<sup>1</sup>
<sup>1</sup> - requires [interface2](https://github.com/lowobservable/coax/tree/master/interface2#readme) with recent firmware
## Supported Terminals
Only directly attached CUT (Control Unit Terminal) type terminals are supported. I have tested oec with the following terminals:
Only CUT (Control Unit Terminal) type terminals are supported. I have tested oec with the following terminals:
* IBM 3278-2
* IBM 3472

View File

@@ -23,7 +23,7 @@ from .keymap_3483 import KEYMAP as KEYMAP_3483
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('oec')
logger = logging.getLogger('oec.main')
CONTROLLER = None
@@ -76,7 +76,7 @@ def _create_session(args, device):
def _signal_handler(number, frame):
global CONTROLLER
print('Stopping controller...')
logger.info('Stopping controller...')
if CONTROLLER:
CONTROLLER.stop()
@@ -115,11 +115,11 @@ def main():
create_device = lambda interface, device_address, poll_response: _create_device(args, interface, device_address, poll_response)
create_session = lambda device: _create_session(args, device)
logger.info('Starting controller...')
with open_serial_interface(args.serial_port) as interface:
CONTROLLER = Controller(InterfaceWrapper(interface), create_device, create_session)
print('Starting controller...')
CONTROLLER.run()
if __name__ == '__main__':

View File

@@ -3,15 +3,27 @@ oec.controller
~~~~~~~~~~~~~~
"""
from enum import Enum
import time
import logging
import selectors
from coax import Poll, PollAck, KeystrokePollResponse, ReceiveTimeout
from concurrent import futures
from itertools import groupby
from coax import InterfaceFeature, Poll, PollAck, KeystrokePollResponse, \
ReceiveTimeout, ReceiveError, ProtocolError
from coax.multiplexer import PORT_MAP_3299
from .device import address_commands, format_address, UnsupportedDeviceError
from .keyboard import Key
from .session import SessionDisconnectedError
class SessionState(Enum):
"""Session state."""
STARTING = 1
ACTIVE = 2
TERMINATING = 3
class Controller:
"""The controller."""
@@ -24,148 +36,239 @@ class Controller:
self.create_device = create_device
self.create_session = create_session
self.device = None
self.devices = { }
self.detatched_device_poll_queue = []
self.session = None
self.sessions = { }
self.session_selector = None
self.session_executor = None
# Target time between POLL commands in seconds when a device is attached or
# no device is attached.
#
# The attached poll period only applies in cases where the device responded
# with TT/AR to the last poll - this is an effort to improve the keystroke
# responsiveness.
self.attached_poll_period = 1 / 10
self.detatached_poll_period = 5
self.attached_poll_period = 1 / 15
self.detatched_poll_period = 1 / 2
self.last_poll_time = None
self.last_poll_response = None
# Maximum number of POLL commands to execute, per attached device, per run
# loop iteration. If all attached devices respond with TT/AR the run loop
# iteration will exit without reaching this maximum depth.
#
# This is an effort to improve the keystroke responsiveness.
self.poll_depth = 3
self.last_attached_poll_time = None
self.last_detatched_poll_time = None
def run(self):
"""Run the controller."""
self.running = True
self.session_selector = selectors.DefaultSelector()
self.session_executor = futures.ThreadPoolExecutor()
self.logger.info('Controller started')
while self.running:
self._run_loop()
self._terminate_session()
self.session_executor.shutdown(wait=True)
self.session_executor = None
for session in [session for (state, session) in self.sessions.values() if state == SessionState.ACTIVE]:
self._terminate_session(session, blocking=True)
self.session_selector.close()
self.session_selector = None
if self.device:
self.device = None
self.sessions.clear()
self.devices.clear()
self.detatched_device_poll_queue.clear()
self.logger.info('Controller stopped')
def stop(self):
"""Stop the controller."""
self.running = False
def _run_loop(self):
poll_delay = self._calculate_poll_delay(time.perf_counter())
poll_delay = self._calculate_poll_delay()
# If POLLing is delayed, handle the host output, otherwise just sleep.
start_time = time.perf_counter()
if poll_delay > 0:
if self.session:
self._update_session(poll_delay)
else:
time.sleep(poll_delay)
self._update_sessions(poll_delay)
poll_delay -= (time.perf_counter() - start_time)
if poll_delay > 0:
time.sleep(poll_delay)
# POLL devices.
self._poll_attached_device()
self._poll_detatched_device()
self._poll_attached_devices()
self._poll_next_detatched_device()
def _update_session(self, duration):
try:
update_count = 0
def _update_sessions(self, duration):
start_time = time.perf_counter()
while duration > 0:
start_time = time.perf_counter()
# Start any missing sessions.
for device_address in self.devices.keys() - self.sessions.keys():
self._start_session(self.devices[device_address])
selected = self.session_selector.select(duration)
sessions = { state: [(device_address, session) for (device_address, (_, session)) in group] for (state, group) in groupby(self.sessions.items(), lambda item: item[1][0]) }
if not selected:
break
# Handle started sessions.
for (device_address, future) in sessions.get(SessionState.STARTING, []):
if future.done():
session = future.result()
for (key, _) in selected:
session = key.fileobj
self.sessions[device_address] = (SessionState.ACTIVE, session)
self.session_selector.register(session, selectors.EVENT_READ)
self.logger.info(f'Session started for device @ {format_address(self.interface, device_address)}')
# Handle terminated sessions.
for (device_address, future) in sessions.get(SessionState.TERMINATING, []):
if future.done():
del self.sessions[device_address]
self.logger.info(f'Session terminated for device @ {format_address(self.interface, device_address)}')
# Update the duration based on the time taken handling futures.
duration -= (time.perf_counter() - start_time)
# Update active sessions.
updated_sessions = set()
while duration > 0:
start_time = time.perf_counter()
selected = self.session_selector.select(duration)
if not selected:
break
for (key, _) in selected:
session = key.fileobj
try:
if session.handle_host():
update_count += 1
updated_sessions.add(session)
except SessionDisconnectedError:
updated_sessions.discard(session)
duration -= (time.perf_counter() - start_time)
self._handle_session_disconnected(session)
if update_count > 0:
self.session.render()
except SessionDisconnectedError:
self._handle_session_disconnected()
duration -= (time.perf_counter() - start_time)
def _start_session(self):
self.session = self.create_session(self.device)
for session in updated_sessions:
session.render()
self.session.start()
def _start_session(self, device):
device_address = device.device_address
self.session_selector.register(self.session, selectors.EVENT_READ)
self.logger.info(f'Starting session for device @ {format_address(self.interface, device_address)}')
def _terminate_session(self):
if not self.session:
return
def start_session():
session = self.create_session(device)
self.session_selector.unregister(self.session)
session.start()
self.session.terminate()
return session
self.session = None
future = self.session_executor.submit(start_session)
def _handle_session_disconnected(self):
self.sessions[device_address] = (SessionState.STARTING, future)
def _terminate_session(self, session, blocking=False):
device_address = session.terminal.device_address
self.logger.info(f'Terminating session for device @ {format_address(self.interface, device_address)}')
self.session_selector.unregister(session)
def terminate_session():
session.terminate()
if blocking:
terminate_session()
del self.sessions[device_address]
else:
future = self.session_executor.submit(terminate_session)
self.sessions[device_address] = (SessionState.TERMINATING, future)
def _handle_session_disconnected(self, session):
self.logger.info('Session disconnected')
self._terminate_session()
self._terminate_session(session)
# Restart the session.
self._start_session()
def _poll_attached_devices(self):
self.last_attached_poll_time = time.perf_counter()
def _poll_attached_device(self):
if not self.device:
for _ in range(self.poll_depth):
devices = self.devices.values()
if not devices:
break
poll_commands = [address_commands(device.device_address, Poll(device.get_poll_action())) for device in devices]
poll_responses = list(zip(devices, self.interface.execute(poll_commands, receive_timeout_is_error=False)))
# Handle POLL responses.
handleable_poll_responses = [pair for pair in poll_responses if pair[1] is not None and not isinstance(pair[1], ReceiveTimeout)]
if handleable_poll_responses:
poll_ack_commands = [address_commands(device.device_address, PollAck()) for (device, _) in handleable_poll_responses]
self.interface.execute(poll_ack_commands)
for (device, poll_response) in handleable_poll_responses:
self._handle_poll_response(device, poll_response)
# Handle lost devices.
for (device, poll_response) in poll_responses:
if isinstance(poll_response, ReceiveTimeout):
self._handle_device_lost(device)
if not handleable_poll_responses:
break
def _poll_next_detatched_device(self):
if self.last_detatched_poll_time is not None and (time.perf_counter() - self.last_detatched_poll_time) < self.detatched_poll_period:
return
self.last_poll_time = time.perf_counter()
self.last_detatched_poll_time = time.perf_counter()
if not self.detatched_device_poll_queue:
self.detatched_device_poll_queue = list(self._get_detatched_device_addresses())
try:
poll_response = self.device.poll()
device_address = self.detatched_device_poll_queue.pop(0)
except IndexError:
return
try:
poll_response = self.interface.execute(address_commands(device_address, Poll()))
except ReceiveTimeout:
self._handle_device_lost()
return
except ReceiveError as error:
self.logger.warning(f'POLL detatched device @ {format_address(self.interface, device_address)} receive error: {error}')
return
except ProtocolError as error:
self.logger.warning(f'POLL detatched device @ {format_address(self.interface, device_address)} protocol error: {error}')
return
if poll_response:
self._poll_ack(self.device.device_address)
self._handle_poll_response(poll_response)
self.last_poll_response = poll_response
def _poll_detatched_device(self):
if self.device:
return
self.last_poll_time = time.perf_counter()
device_address = None
try:
poll_response = self._poll(device_address)
except ReceiveTimeout:
return
if poll_response:
self._poll_ack(device_address)
self.interface.execute(address_commands(device_address, PollAck()))
self._handle_device_found(device_address, poll_response)
self.last_poll_response = poll_response
def _handle_device_found(self, device_address, poll_response):
self.logger.info(f'Found device @ {format_address(self.interface, device_address)}')
@@ -177,29 +280,31 @@ class Controller:
device.setup()
self.device = device
self.devices[device_address] = device
self.logger.info(f'Attached device @ {format_address(self.interface, device_address)}')
self._start_session()
def _handle_device_lost(self):
device_address = self.device.device_address
def _handle_device_lost(self, device):
device_address = device.device_address
self.logger.info(f'Lost device @ {format_address(self.interface, device_address)}')
self._terminate_session()
if device_address in self.sessions:
(session_state, session) = self.sessions[device_address]
self.device = None
if session_state == SessionState.ACTIVE:
self._terminate_session(session)
del self.devices[device_address]
self.logger.info(f'Detached device @ {format_address(self.interface, device_address)}')
def _handle_poll_response(self, poll_response):
def _handle_poll_response(self, device, poll_response):
if isinstance(poll_response, KeystrokePollResponse):
self._handle_keystroke_poll_response(poll_response)
self._handle_keystroke_poll_response(device, poll_response)
def _handle_keystroke_poll_response(self, poll_response):
terminal = self.device
def _handle_keystroke_poll_response(self, terminal, poll_response):
device_address = terminal.device_address
scan_code = poll_response.scan_code
(key, modifiers, modifiers_changed) = terminal.keyboard.get_key(scan_code)
@@ -221,27 +326,34 @@ class Controller:
terminal.display.toggle_cursor_reverse()
elif key == Key.CLICKER:
terminal.keyboard.toggle_clicker()
elif self.session:
self.session.handle_key(key, modifiers, scan_code)
elif device_address in self.sessions:
(session_state, session) = self.sessions[device_address]
self.session.render()
if session_state == SessionState.ACTIVE:
session.handle_key(key, modifiers, scan_code)
def _poll(self, device_address):
return self.interface.execute(address_commands(device_address, Poll()))
session.render()
def _poll_ack(self, device_address):
self.interface.execute(address_commands(device_address, PollAck()))
def _calculate_poll_delay(self, current_time):
if self.last_poll_response is not None:
def _calculate_poll_delay(self):
if self.last_attached_poll_time is None:
return 0
if self.last_poll_time is None:
return 0
return max((self.last_attached_poll_time + self.attached_poll_period) - time.perf_counter(), 0)
if self.device:
period = self.attached_poll_period
def _get_detatched_device_addresses(self):
attached_addresses = set(self.devices.keys())
# The 3299 is transparent, but if there is at least one device attached to a 3299
# port then we can assume there is a 3299 attached and if there is one device
# direct attached then we can assume there is not a 3299 attached.
is_3299_attached = any(attached_addresses.difference([None]))
is_3299_not_attached = (None in attached_addresses)
if is_3299_not_attached or InterfaceFeature.PROTOCOL_3299 not in self.interface.features:
addresses = [None]
elif is_3299_attached:
addresses = PORT_MAP_3299
else:
period = self.detatached_poll_period
addresses = [None, *PORT_MAP_3299]
return max((self.last_poll_time + period) - current_time, 0)
return filter(lambda address: address not in attached_addresses, addresses)

View File

@@ -9,6 +9,7 @@ import logging
from more_itertools import chunked
from coax import read_feature_ids, parse_features, ReadTerminalId, ReadExtendedId, \
Feature, ProtocolError
from coax.multiplexer import PORT_MAP_3299
logger = logging.getLogger(__name__)
@@ -23,8 +24,8 @@ class Device:
"""Setup the device."""
raise NotImplementedError
def poll(self):
"""POLL the device."""
def get_poll_action(self):
"""Get the POLL action."""
raise NotImplementedError
def execute(self, commands):
@@ -67,9 +68,12 @@ def address_commands(device_address, commands):
def format_address(interface, device_address):
"""Format a device address."""
if device_address is None:
return interface.identifier
return f'{interface.identifier}#0'
raise NotImplementedError
try:
return f'{interface.identifier}#{PORT_MAP_3299.index(device_address)}'
except ValueError:
return f'{interface.identifier}?{device_address:06b}'
def get_ids(interface, device_address, extended_id_retry_attempts=3):
terminal_id = None
@@ -78,7 +82,7 @@ def get_ids(interface, device_address, extended_id_retry_attempts=3):
try:
terminal_id = interface.execute(address_commands(device_address, ReadTerminalId()))
except ProtocolError as error:
logger.warning(f'READ_TERMINAL_ID protocol error: {error}', exc_info=error)
logger.warning(f'READ_TERMINAL_ID protocol error: {error}')
# Retry the READ_EXTENDED_ID command as it appears to fail frequently on the
# first request - unlike the READ_TERMINAL_ID command,
@@ -90,9 +94,9 @@ def get_ids(interface, device_address, extended_id_retry_attempts=3):
break
except ProtocolError as error:
logger.warning(f'READ_EXTENDED_ID protocol error: {error}', exc_info=error)
logger.warning(f'READ_EXTENDED_ID protocol error: {error}')
time.sleep(0.25)
time.sleep(0.1)
return (terminal_id, extended_id.hex() if extended_id is not None else None)

View File

@@ -7,6 +7,8 @@ import os
import logging
from textwrap import dedent
from coax import ReceiveTimeout
logger = logging.getLogger(__name__)
class AggregateExecuteError(Exception):
@@ -20,7 +22,7 @@ class InterfaceWrapper:
def __init__(self, interface):
self.interface = interface
self.timeout = 0.1
self.timeout = 0.001
self.jumbo_write_strategy = _get_jumbo_write_strategy()
self.jumbo_write_max_length = None
@@ -37,13 +39,13 @@ class InterfaceWrapper:
return getattr(self.interface, attr)
def execute(self, commands):
def execute(self, commands, receive_timeout_is_error=True):
if not isinstance(commands, list):
return self.interface.execute(commands, self.timeout)
responses = self.interface.execute(commands, self.timeout)
errors = [response for response in responses if isinstance(response, BaseException)]
errors = [response for response in responses if isinstance(response, BaseException) and (receive_timeout_is_error or not isinstance(response, ReceiveTimeout))]
if any(errors):
raise AggregateExecuteError(errors, responses)

View File

@@ -1,4 +1,7 @@
class Session:
def __init__(self, terminal):
self.terminal = terminal
def start(self):
raise NotImplementedError

View File

@@ -3,7 +3,7 @@ oec.terminal
~~~~~~~~~~~~
"""
from coax import Poll, LoadControlRegister, Feature, PollAction, Control
from coax import LoadControlRegister, Feature, PollAction, Control
from .device import Device, UnsupportedDeviceError
from .display import Dimensions, BufferedDisplay
@@ -53,30 +53,26 @@ class Terminal(Device):
# Show the attached indicator on the status line.
self.display.status_line.write_string(0, 'OEC')
def poll(self):
"""Execute a POLL command with queued actions."""
self.display.move_cursor(row=0, column=0)
def get_poll_action(self):
"""Get the POLL action."""
poll_action = PollAction.NONE
# Convert a queued alarm or keyboard clicker change to POLL action.
if self.alarm:
poll_action = PollAction.ALARM
self.alarm = False
elif self.keyboard.clicker != self.last_poll_keyboard_clicker:
if self.keyboard.clicker:
poll_action = PollAction.ENABLE_KEYBOARD_CLICKER
else:
poll_action = PollAction.DISABLE_KEYBOARD_CLICKER
poll_response = self.execute(Poll(poll_action))
# Clear the queued alarm and keyboard clicker change if the POLL was
# successful.
if poll_action == PollAction.ALARM:
self.alarm = False
elif poll_action in [PollAction.ENABLE_KEYBOARD_CLICKER,
PollAction.DISABLE_KEYBOARD_CLICKER]:
self.last_poll_keyboard_clicker = self.keyboard.clicker
return poll_response
return poll_action
def sound_alarm(self):
"""Queue an alarm on next POLL command."""

View File

@@ -48,9 +48,10 @@ class TN3270Session(Session):
"""TN3270 session."""
def __init__(self, terminal, host, port):
super().__init__(terminal)
self.logger = logging.getLogger(__name__)
self.terminal = terminal
self.host = host
self.port = port

View File

@@ -71,9 +71,10 @@ class VT100Session(Session):
"""VT100 session."""
def __init__(self, terminal, host_command):
super().__init__(terminal)
self.logger = logging.getLogger(__name__)
self.terminal = terminal
self.host_command = host_command
self.host_process = None
@@ -94,19 +95,11 @@ class VT100Session(Session):
self.vt100_stream = pyte.ByteStream(self.vt100_screen)
self.is_first_render = True
def start(self):
# Start the host process.
self._start_host_process()
# Clear the screen.
self.terminal.display.clear()
# Update the status line.
self.terminal.display.status_line.write_string(45, 'VT100')
# Reset the cursor.
self.terminal.display.move_cursor(row=0, column=0)
def terminate(self):
if self.host_process:
self._terminate_host_process()
@@ -137,6 +130,11 @@ class VT100Session(Session):
self.host_process.write(bytes_)
def render(self):
if self.is_first_render:
self.terminal.display.status_line.write_string(45, 'VT100')
self.is_first_render = False
self._apply()
self._flush()

View File

@@ -1,6 +1,6 @@
more-itertools==8.10.0
ptyprocess==0.7.0
pycoax==0.10.0
pycoax==0.11.0
pyserial==3.5
pyte==0.8.0
pytn3270==0.12.0

View File

@@ -1,10 +1,12 @@
from unittest.mock import Mock
from unittest.mock import Mock, ANY
from coax import ProtocolError, ReceiveError, ReceiveTimeout
from coax.interface import Interface
class MockInterface(Interface):
def __init__(self, responses=[]):
super().__init__()
self.mock_responses = responses
self.serial = Mock(port='/dev/mock')
@@ -40,7 +42,7 @@ class MockInterface(Interface):
for command in call[0][0]:
(call_device_address, call_command) = command
if call_device_address == device_address and isinstance(call_command, command_type):
if (device_address == ANY or call_device_address == device_address) and isinstance(call_command, command_type):
if predicate is None or predicate(call_command):
commands.append(command)

View File

@@ -1,265 +1,344 @@
import unittest
from unittest.mock import Mock, create_autospec, patch
from unittest.mock import Mock, create_autospec, patch, call, ANY
import selectors
from selectors import BaseSelector
from logging import Logger
from coax import Poll, PowerOnResetCompletePollResponse, KeystrokePollResponse, PollAck, ReceiveTimeout
from coax.protocol import TerminalId
from concurrent.futures import Future
from coax import Poll, PollAck, PowerOnResetCompletePollResponse, KeystrokePollResponse, ReceiveTimeout
import context
from oec.interface import InterfaceWrapper
from oec.controller import Controller
from oec.device import UnsupportedDeviceError
from oec.controller import Controller, SessionState
from oec.terminal import Terminal
from oec.keyboard import KeyboardModifiers, Key
from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2
from oec.session import Session, SessionDisconnectedError
from mock_interface import MockInterface
class RunLoopTestCase(unittest.TestCase):
class UpdateSessionsTestCase(unittest.TestCase):
def setUp(self):
self.interface = MockInterface()
self.terminal = Terminal(self.interface, None, TerminalId(0b11110100), 'c1348300', { }, KEYMAP_3278_2)
self.terminal.setup = Mock(Terminal, instance=True)
self.terminal.display.write = Mock()
self.terminal.display.toggle_cursor_blink = Mock()
self.terminal.display.toggle_cursor_reverse = Mock()
self.terminal.keyboard.toggle_clicker = Mock()
self.create_device = Mock(return_value=self.terminal)
self.session = create_autospec(Session, instance=True)
self.create_session = Mock(return_value=self.session)
self.controller = Controller(InterfaceWrapper(self.interface), self.create_device, self.create_session)
self.controller.logger = create_autospec(Logger, instance=True)
self.controller.attached_poll_period = 1
self.controller = Controller(InterfaceWrapper(self.interface), None, None)
self.controller.session_selector = create_autospec(BaseSelector, instance=True)
self.controller._update_session = Mock(wraps=self.controller._update_session)
patcher = patch('oec.controller.time.perf_counter')
self.perf_counter = patcher.start()
patcher = patch('oec.controller.time.sleep')
self.sleep = patcher.start()
self.addCleanup(patch.stopall)
def test_no_device(self):
self._assert_run_loop(0, ReceiveTimeout, False, 0, False)
self._assert_run_loop(1, ReceiveTimeout, False, 4, False)
self.assertIsNone(self.controller.device)
self.assertIsNone(self.controller.session)
def test_device_attached(self):
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
self._assert_run_loop(0, None, False, 0, False)
self._assert_run_loop(0.5, None, True, 0.5, False)
self.assertIsNotNone(self.controller.device)
self.assertIsNotNone(self.controller.session)
self.controller._update_session.assert_called()
def test_unsupported_device_attached(self):
self.create_device.side_effect = [UnsupportedDeviceError]
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
self.assertIsNone(self.controller.device)
self.assertIsNone(self.controller.session)
def test_keystroke(self):
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
self._assert_run_loop(0, KeystrokePollResponse(0b0110000010), False, 0, True)
self._assert_run_loop(0, None, False, 0, False)
self._assert_run_loop(0.5, None, True, 0.5, False)
self.assertIsNotNone(self.controller.device)
self.assertIsNotNone(self.controller.session)
self.controller.session.handle_key.assert_called_with(Key.LOWER_A, KeyboardModifiers.NONE, 96)
def test_device_detatched(self):
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
self._assert_run_loop(0, None, False, 0, False)
self._assert_run_loop(0.5, ReceiveTimeout, True, 0.5, False)
self.assertIsNone(self.controller.device)
self.assertIsNone(self.controller.session)
self.session.terminate.assert_called()
def test_session_disconnected(self):
selector_key = Mock(fileobj=self.session)
self.controller.session_selector.select.return_value = [(selector_key, selectors.EVENT_READ)]
self.session.handle_host = Mock(side_effect=[None, SessionDisconnectedError])
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
self._assert_run_loop(0, None, False, 0, False)
self._assert_run_loop(0.5, None, True, 0.5, False)
self._assert_run_loop(1.5, None, True, 3, False)
self.assertIsNotNone(self.controller.device)
self.assertIsNotNone(self.controller.session)
self.assertEqual(self.create_session.call_count, 2)
def test_toggle_cursor_blink(self):
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True)
self.terminal.display.toggle_cursor_blink.assert_called_once()
self.terminal.display.toggle_cursor_blink.reset_mock()
self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True)
self.terminal.display.toggle_cursor_blink.assert_called_once()
def test_toggle_cursor_reverse(self):
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True)
self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True)
self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True)
self.terminal.display.toggle_cursor_reverse.assert_called_once()
self.terminal.display.toggle_cursor_reverse.reset_mock()
self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True)
self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True)
self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True)
self.terminal.display.toggle_cursor_reverse.assert_called_once()
def test_toggle_clicker(self):
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
self._assert_run_loop(0, KeystrokePollResponse(0b0101011110), False, 0, True)
self._assert_run_loop(0, None, False, 0, False)
self.terminal.keyboard.toggle_clicker.assert_called_once()
self.terminal.keyboard.toggle_clicker.reset_mock()
self._assert_run_loop(0.5, KeystrokePollResponse(0b0101011110), True, 0.5, True)
self._assert_run_loop(1, None, False, 0, False)
self.terminal.keyboard.toggle_clicker.assert_called_once()
def _assert_run_loop(self, poll_time, poll_response, expected_update_session, expected_poll_delay, expected_poll_ack):
def test_no_sessions(self):
# Arrange
self.controller._update_session.reset_mock()
self.interface.mock_responses = [(None, Poll, None, poll_response)]
self.perf_counter.side_effect = [0, 0.1, 0.2]
self.interface.reset_mock()
def perf_counter():
nonlocal poll_time
time = poll_time
poll_time += expected_update_session
return time
self.perf_counter.side_effect = perf_counter
self.sleep.reset_mock()
self.controller.session_selector.select.return_value = []
# Act
self.controller._run_loop()
self.assertFalse(self.controller._update_sessions(1.0))
def test_missing_sessions_are_started(self):
# Arrange
device = create_autospec(Terminal, instance=True)
self.controller.devices[None] = device
self.controller._start_session = Mock()
self.perf_counter.side_effect = [0, 0.1, 0.2]
self.controller.session_selector.select.return_value = []
# Act
self.controller._update_sessions(1.0)
# Assert
if expected_update_session:
self.controller._update_session.assert_called_once_with(expected_poll_delay)
self.sleep.assert_not_called()
else:
self.controller._update_session.assert_not_called()
self.controller._start_session.assert_called_once_with(device)
if expected_poll_delay > 0:
self.sleep.assert_called_once_with(expected_poll_delay)
else:
self.sleep.assert_not_called()
def test_started_sessions_are_activated(self):
# Arrange
device = create_autospec(Terminal, instance=True)
if expected_poll_ack:
self.interface.assert_command_executed(None, PollAck)
else:
self.interface.assert_command_not_executed(None, PollAck)
session = create_autospec(Session, instance=True)
class UpdateSessionTestCase(unittest.TestCase):
future = create_autospec(Future, instance=True)
future.done = Mock(return_value=True)
future.result = Mock(return_value=session)
self.controller.devices[None] = device
self.controller.sessions[None] = (SessionState.STARTING, future)
self.controller.session_selector.select.return_value = []
self.perf_counter.side_effect = [0, 0.1, 0.2]
# Act
self.controller._update_sessions(1.0)
# Assert
self.assertEqual(self.controller.sessions, { None: (SessionState.ACTIVE, session) })
self.controller.session_selector.register.assert_called_once_with(session, selectors.EVENT_READ)
def test_terminated_sessions_are_removed(self):
# Arrange
device = create_autospec(Terminal, instance=True)
future = create_autospec(Future, instance=True)
future.done = Mock(return_value=True)
future.result = Mock()
self.controller.devices[None] = device
self.controller.sessions[None] = (SessionState.TERMINATING, future)
self.controller.session_selector.select.return_value = []
self.perf_counter.side_effect = [0, 0.1, 0.2]
# Act
self.controller._update_sessions(1.0)
# Assert
self.assertEqual(self.controller.sessions, { })
def test_active_sessions_select_timeout(self):
# Arrange
device = create_autospec(Terminal, instance=True)
session = create_autospec(Session, instance=True)
self.controller.devices[None] = device
self.controller.sessions[None] = (SessionState.ACTIVE, session)
self.controller.session_selector.select.return_value = []
self.perf_counter.side_effect = [0, 0.1, 0.2]
# Act
self.controller._update_sessions(1.0)
# Assert
self.controller.session_selector.select.assert_called_once_with(0.9)
session.handle_host.assert_not_called()
session.render.assert_not_called()
def test_active_sessions_select_available(self):
# Arrange
device = create_autospec(Terminal, instance=True)
session = create_autospec(Session, instance=True)
self.controller.devices[None] = device
self.controller.sessions[None] = (SessionState.ACTIVE, session)
selector_key = Mock(fileobj=session)
self.controller.session_selector.select.side_effect = [[(selector_key, selectors.EVENT_READ)], []]
self.perf_counter.side_effect = [0, 0.1, 0.2, 0.3, 0.4, 0.4]
# Act
self.controller._update_sessions(1.0)
# Assert
self.controller.session_selector.select.assert_has_calls([call(0.9), call(0.8)])
session.handle_host.assert_called_once()
session.render.assert_called_once()
def test_active_sessions_disconnected(self):
# Arrange
device = create_autospec(Terminal, instance=True)
session = create_autospec(Session, instance=True)
session.handle_host.side_effect = SessionDisconnectedError
self.controller.devices[None] = device
self.controller.sessions[None] = (SessionState.ACTIVE, session)
self.controller._terminate_session = Mock()
selector_key = Mock(fileobj=session)
self.controller.session_selector.select.side_effect = [[(selector_key, selectors.EVENT_READ)], []]
self.perf_counter.side_effect = [0, 0.1, 0.2, 0.3, 0.4, 0.4]
# Act
self.controller._update_sessions(1.0)
# Assert
self.controller.session_selector.select.assert_has_calls([call(0.9), call(0.8)])
self.controller._terminate_session.assert_called_once_with(session)
session.render.assert_not_called()
class PollAttachedDevicesTestCase(unittest.TestCase):
def setUp(self):
self.controller = Controller(None, None, None)
self.interface = MockInterface()
self.controller.session = create_autospec(Session, instance=True)
self.controller = Controller(InterfaceWrapper(self.interface), None, None)
self.controller.session_selector = create_autospec(BaseSelector, instance=True)
self.controller._handle_poll_response = Mock(wraps=self.controller._handle_poll_response)
def test_no_attached_devices(self):
self.controller._poll_attached_devices()
# Assert
self.interface.assert_command_not_executed(ANY, Poll)
self.controller._handle_poll_response.assert_not_called()
def test_tt_ar(self):
# Arrange
device = create_autospec(Terminal, instance=True, device_address=None)
self.controller.devices[None] = device
# Act
self.controller._poll_attached_devices()
# Assert
self.interface.assert_command_executed(None, Poll)
self.interface.assert_command_not_executed(None, PollAck)
self.controller._handle_poll_response.assert_not_called()
def test_receive_timeout(self):
# Arrange
self.interface.mock_responses = [(None, Poll, None, ReceiveTimeout)]
device = create_autospec(Terminal, instance=True, device_address=None)
self.controller.devices[None] = device
self.controller._handle_device_lost = Mock()
# Act
self.controller._poll_attached_devices()
# Assert
self.controller._handle_device_lost.assert_called_once_with(device)
self.interface.assert_command_executed(None, Poll)
self.interface.assert_command_not_executed(None, PollAck)
self.controller._handle_poll_response.assert_not_called()
def test_keystroke(self):
# Arrange
poll_response = KeystrokePollResponse(0b0110000010)
poll = Mock(side_effect=[poll_response, None, None])
self.interface.mock_responses = [(None, Poll, None, poll)]
device = create_autospec(Terminal, instance=True, device_address=None)
self.controller.devices[None] = device
self.controller._handle_keystroke_poll_response = Mock()
# Act
self.controller._poll_attached_devices()
# Assert
self.controller._handle_keystroke_poll_response.assert_called_once_with(device, poll_response)
self.assertEqual(poll.call_count, 2)
self.interface.assert_command_executed(None, PollAck)
class PollNextDetatchedDeviceTestCase(unittest.TestCase):
def setUp(self):
self.interface = MockInterface()
self.interface.mock_responses = [(None, Poll, None, ReceiveTimeout)]
self.controller = Controller(InterfaceWrapper(self.interface), None, None)
patcher = patch('oec.controller.time.perf_counter')
self.perf_counter = patcher.start()
def test_zero_duration(self):
# Act
self.controller._update_session(0)
self.addCleanup(patch.stopall)
# Assert
self.controller.session.handle_host.assert_not_called()
self.controller.session.render.assert_not_called()
self.controller.session_selector.select.assert_not_called()
def test_select_timeout(self):
def test_poll_period_not_expired(self):
# Arrange
self.controller.session_selector.select.return_value = []
self.controller.detatched_poll_period = 0.5
self.controller.last_detatched_poll_time = 1.0
self.perf_counter.return_value = 1.1
# Act
self.controller._update_session(1)
self.controller._poll_next_detatched_device()
# Assert
self.controller.session.handle_host.assert_not_called()
self.controller.session.render.assert_not_called()
self.interface.assert_command_not_executed(None, Poll)
self.controller.session_selector.select.assert_called_once()
self.assertEqual(self.controller.last_detatched_poll_time, 1.0)
def test_select_available(self):
def test_empty_queue_that_remains_empty(self):
# Arrange
self.perf_counter.side_effect = [0, 0.75, 0.75]
selector_key = Mock(fileobj=self.controller.session)
self.controller.session_selector.select.side_effect = [[(selector_key, selectors.EVENT_READ)], []]
self.controller._get_detatched_device_addresses = Mock(return_value=[])
# Act
self.controller._update_session(1)
self.controller._poll_next_detatched_device()
# Assert
self.controller.session.handle_host.assert_called_once()
self.controller.session.render.assert_called_once()
self.interface.assert_command_not_executed(None, Poll)
self.assertEqual(self.controller.session_selector.select.call_count, 2)
self.controller._get_detatched_device_addresses.assert_called_once()
call_args_list = self.controller.session_selector.select.call_args_list
def test_empty_queue_that_is_populated(self):
# Arrange
self.controller._get_detatched_device_addresses = Mock(return_value=[None])
self.assertEqual(call_args_list[0][0][0], 1)
self.assertEqual(call_args_list[1][0][0], 0.25)
# Act
self.controller._poll_next_detatched_device()
# Assert
self.interface.assert_command_executed(None, Poll)
self.controller._get_detatched_device_addresses.assert_called_once()
def test_non_empty_queue(self):
# Arrange
self.interface.mock_responses = [(0b000000, Poll, None, ReceiveTimeout)]
self.controller.detatched_device_poll_queue = [0b000000, 0b100000]
self.controller._get_detatched_device_addresses = Mock()
# Act
self.controller._poll_next_detatched_device()
# Assert
self.interface.assert_command_executed(0b000000, Poll)
self.assertEqual(self.controller.detatched_device_poll_queue, [0b100000])
self.controller._get_detatched_device_addresses.assert_not_called()
def test_device_found(self):
# Arrange
self.controller.detatched_device_poll_queue = [None]
poll_response = PowerOnResetCompletePollResponse(0xa)
poll = Mock(side_effect=[poll_response, None, None])
self.interface.mock_responses = [(None, Poll, None, poll)]
self.controller._handle_device_found = Mock()
# Act
self.controller._poll_next_detatched_device()
# Assert
self.controller._handle_device_found.assert_called_once_with(None, poll_response)
self.interface.assert_command_executed(None, PollAck)

View File

@@ -38,11 +38,13 @@ class FormatAddressTestCase(unittest.TestCase):
self.interface = MockInterface()
def test_no_port(self):
self.assertEqual(format_address(InterfaceWrapper(self.interface), None), '/dev/mock')
self.assertEqual(format_address(InterfaceWrapper(self.interface), None), '/dev/mock#0')
def test_multiplexer(self):
with self.assertRaises(NotImplementedError):
format_address(InterfaceWrapper(self.interface), 0b110000)
def test_known_multiplexer_port(self):
self.assertEqual(format_address(InterfaceWrapper(self.interface), 0b110000), '/dev/mock#3')
def test_unknown_multiplexer_port(self):
self.assertEqual(format_address(InterfaceWrapper(self.interface), 0b111111), '/dev/mock?111111')
class GetIdsTestCase(unittest.TestCase):
def setUp(self):

View File

@@ -1,6 +1,6 @@
import unittest
from unittest.mock import create_autospec
from coax import Poll, PollAction
from coax import PollAction
from coax.protocol import TerminalId
import context
@@ -43,7 +43,7 @@ class TerminalSetupTestCase(unittest.TestCase):
def test(self):
self.terminal.setup()
class TerminalPollTestCase(unittest.TestCase):
class TerminalGetPollActionTestCase(unittest.TestCase):
def setUp(self):
self.interface = MockInterface()
@@ -54,26 +54,17 @@ class TerminalPollTestCase(unittest.TestCase):
# The terminal will be initialized in a state where the terminal keyboard clicker
# state is unknown, and this cannot be read. Therefore the first POLL will always
# attempt to set the keyboard clicker state...
self.terminal.poll()
self.interface.reset_mock()
self.terminal.get_poll_action()
def test_with_no_queued_actions(self):
# Act
self.terminal.poll()
# Assert
self.assert_poll_with_poll_action(PollAction.NONE)
self.assertEqual(self.terminal.get_poll_action(), PollAction.NONE)
def test_with_sound_alarm_queued(self):
# Arrange
self.terminal.sound_alarm()
# Act
self.terminal.poll()
# Assert
self.assert_poll_with_poll_action(PollAction.ALARM)
# Act and assert
self.assertEqual(self.terminal.get_poll_action(), PollAction.ALARM)
def test_with_enable_keyboard_clicker_queued(self):
# Arrange
@@ -81,14 +72,8 @@ class TerminalPollTestCase(unittest.TestCase):
self.terminal.keyboard.toggle_clicker()
# Act
self.terminal.poll()
# Assert
self.assert_poll_with_poll_action(PollAction.ENABLE_KEYBOARD_CLICKER)
def assert_poll_with_poll_action(self, action):
self.interface.assert_command_executed(None, Poll, lambda command: command.action == action)
# Act and assert
self.assertEqual(self.terminal.get_poll_action(), PollAction.ENABLE_KEYBOARD_CLICKER)
def _create_terminal(interface):
terminal_id = TerminalId(0b11110100)

View File

@@ -131,6 +131,8 @@ class SessionRenderTestCase(unittest.TestCase):
def test_with_no_eab_feature(self):
# Arrange
self.session.is_first_render = False
self.session.host_process.read = Mock(return_value=b'abc')
self.session.handle_host()
@@ -157,6 +159,8 @@ class SessionRenderTestCase(unittest.TestCase):
self.terminal.display.move_cursor = Mock(wraps=self.terminal.display.move_cursor)
self.terminal.display.flush = Mock(wraps=self.terminal.display.flush)
self.session.is_first_render = False
self.session.host_process.read = Mock(return_value=b'abc')
self.session.handle_host()