diff --git a/README.md b/README.md
index f87fa48..d763178 100644
--- a/README.md
+++ b/README.md
@@ -22,12 +22,15 @@ emulation.
- [x] Basic TN3270E
- [ ] SSL/TLS
- [x] VT100
- - [ ] Connection Menu
+ - [ ] Connection menu
- [ ] MLT (Multiple Logical Terminals)
+ - [x] Up to 8 terminals connected via IBM 3299 multiplexer1
+
+1 - requires [interface2](https://github.com/lowobservable/coax/tree/master/interface2#readme) with recent firmware
## Supported Terminals
-Only directly attached CUT (Control Unit Terminal) type terminals are supported. I have tested oec with the following terminals:
+Only CUT (Control Unit Terminal) type terminals are supported. I have tested oec with the following terminals:
* IBM 3278-2
* IBM 3472
diff --git a/oec/__main__.py b/oec/__main__.py
index 8c697e3..1c98b94 100644
--- a/oec/__main__.py
+++ b/oec/__main__.py
@@ -23,7 +23,7 @@ from .keymap_3483 import KEYMAP as KEYMAP_3483
logging.basicConfig(level=logging.INFO)
-logger = logging.getLogger('oec')
+logger = logging.getLogger('oec.main')
CONTROLLER = None
@@ -76,7 +76,7 @@ def _create_session(args, device):
def _signal_handler(number, frame):
global CONTROLLER
- print('Stopping controller...')
+ logger.info('Stopping controller...')
if CONTROLLER:
CONTROLLER.stop()
@@ -115,11 +115,11 @@ def main():
create_device = lambda interface, device_address, poll_response: _create_device(args, interface, device_address, poll_response)
create_session = lambda device: _create_session(args, device)
+ logger.info('Starting controller...')
+
with open_serial_interface(args.serial_port) as interface:
CONTROLLER = Controller(InterfaceWrapper(interface), create_device, create_session)
- print('Starting controller...')
-
CONTROLLER.run()
if __name__ == '__main__':
diff --git a/oec/controller.py b/oec/controller.py
index 6b92ed5..00f104f 100644
--- a/oec/controller.py
+++ b/oec/controller.py
@@ -3,15 +3,27 @@ oec.controller
~~~~~~~~~~~~~~
"""
+from enum import Enum
import time
import logging
import selectors
-from coax import Poll, PollAck, KeystrokePollResponse, ReceiveTimeout
+from concurrent import futures
+from itertools import groupby
+from coax import InterfaceFeature, Poll, PollAck, KeystrokePollResponse, \
+ ReceiveTimeout, ReceiveError, ProtocolError
+from coax.multiplexer import PORT_MAP_3299
from .device import address_commands, format_address, UnsupportedDeviceError
from .keyboard import Key
from .session import SessionDisconnectedError
+class SessionState(Enum):
+ """Session state."""
+
+ STARTING = 1
+ ACTIVE = 2
+ TERMINATING = 3
+
class Controller:
"""The controller."""
@@ -24,148 +36,239 @@ class Controller:
self.create_device = create_device
self.create_session = create_session
- self.device = None
+ self.devices = { }
+ self.detatched_device_poll_queue = []
- self.session = None
+ self.sessions = { }
self.session_selector = None
+ self.session_executor = None
# Target time between POLL commands in seconds when a device is attached or
# no device is attached.
- #
- # The attached poll period only applies in cases where the device responded
- # with TT/AR to the last poll - this is an effort to improve the keystroke
- # responsiveness.
- self.attached_poll_period = 1 / 10
- self.detatached_poll_period = 5
+ self.attached_poll_period = 1 / 15
+ self.detatched_poll_period = 1 / 2
- self.last_poll_time = None
- self.last_poll_response = None
+ # Maximum number of POLL commands to execute, per attached device, per run
+ # loop iteration. If all attached devices respond with TT/AR the run loop
+ # iteration will exit without reaching this maximum depth.
+ #
+ # This is an effort to improve the keystroke responsiveness.
+ self.poll_depth = 3
+
+ self.last_attached_poll_time = None
+ self.last_detatched_poll_time = None
def run(self):
"""Run the controller."""
self.running = True
self.session_selector = selectors.DefaultSelector()
+ self.session_executor = futures.ThreadPoolExecutor()
+
+ self.logger.info('Controller started')
while self.running:
self._run_loop()
- self._terminate_session()
+ self.session_executor.shutdown(wait=True)
+
+ self.session_executor = None
+
+ for session in [session for (state, session) in self.sessions.values() if state == SessionState.ACTIVE]:
+ self._terminate_session(session, blocking=True)
self.session_selector.close()
self.session_selector = None
- if self.device:
- self.device = None
+ self.sessions.clear()
+
+ self.devices.clear()
+ self.detatched_device_poll_queue.clear()
+
+ self.logger.info('Controller stopped')
def stop(self):
"""Stop the controller."""
self.running = False
def _run_loop(self):
- poll_delay = self._calculate_poll_delay(time.perf_counter())
+ poll_delay = self._calculate_poll_delay()
# If POLLing is delayed, handle the host output, otherwise just sleep.
+ start_time = time.perf_counter()
+
if poll_delay > 0:
- if self.session:
- self._update_session(poll_delay)
- else:
- time.sleep(poll_delay)
+ self._update_sessions(poll_delay)
+
+ poll_delay -= (time.perf_counter() - start_time)
+
+ if poll_delay > 0:
+ time.sleep(poll_delay)
# POLL devices.
- self._poll_attached_device()
- self._poll_detatched_device()
+ self._poll_attached_devices()
+ self._poll_next_detatched_device()
- def _update_session(self, duration):
- try:
- update_count = 0
+ def _update_sessions(self, duration):
+ start_time = time.perf_counter()
- while duration > 0:
- start_time = time.perf_counter()
+ # Start any missing sessions.
+ for device_address in self.devices.keys() - self.sessions.keys():
+ self._start_session(self.devices[device_address])
- selected = self.session_selector.select(duration)
+ sessions = { state: [(device_address, session) for (device_address, (_, session)) in group] for (state, group) in groupby(self.sessions.items(), lambda item: item[1][0]) }
- if not selected:
- break
+ # Handle started sessions.
+ for (device_address, future) in sessions.get(SessionState.STARTING, []):
+ if future.done():
+ session = future.result()
- for (key, _) in selected:
- session = key.fileobj
+ self.sessions[device_address] = (SessionState.ACTIVE, session)
+ self.session_selector.register(session, selectors.EVENT_READ)
+
+ self.logger.info(f'Session started for device @ {format_address(self.interface, device_address)}')
+
+ # Handle terminated sessions.
+ for (device_address, future) in sessions.get(SessionState.TERMINATING, []):
+ if future.done():
+ del self.sessions[device_address]
+
+ self.logger.info(f'Session terminated for device @ {format_address(self.interface, device_address)}')
+
+ # Update the duration based on the time taken handling futures.
+ duration -= (time.perf_counter() - start_time)
+
+ # Update active sessions.
+ updated_sessions = set()
+
+ while duration > 0:
+ start_time = time.perf_counter()
+
+ selected = self.session_selector.select(duration)
+
+ if not selected:
+ break
+
+ for (key, _) in selected:
+ session = key.fileobj
+
+ try:
if session.handle_host():
- update_count += 1
+ updated_sessions.add(session)
+ except SessionDisconnectedError:
+ updated_sessions.discard(session)
- duration -= (time.perf_counter() - start_time)
+ self._handle_session_disconnected(session)
- if update_count > 0:
- self.session.render()
- except SessionDisconnectedError:
- self._handle_session_disconnected()
+ duration -= (time.perf_counter() - start_time)
- def _start_session(self):
- self.session = self.create_session(self.device)
+ for session in updated_sessions:
+ session.render()
- self.session.start()
+ def _start_session(self, device):
+ device_address = device.device_address
- self.session_selector.register(self.session, selectors.EVENT_READ)
+ self.logger.info(f'Starting session for device @ {format_address(self.interface, device_address)}')
- def _terminate_session(self):
- if not self.session:
- return
+ def start_session():
+ session = self.create_session(device)
- self.session_selector.unregister(self.session)
+ session.start()
- self.session.terminate()
+ return session
- self.session = None
+ future = self.session_executor.submit(start_session)
- def _handle_session_disconnected(self):
+ self.sessions[device_address] = (SessionState.STARTING, future)
+
+ def _terminate_session(self, session, blocking=False):
+ device_address = session.terminal.device_address
+
+ self.logger.info(f'Terminating session for device @ {format_address(self.interface, device_address)}')
+
+ self.session_selector.unregister(session)
+
+ def terminate_session():
+ session.terminate()
+
+ if blocking:
+ terminate_session()
+
+ del self.sessions[device_address]
+ else:
+ future = self.session_executor.submit(terminate_session)
+
+ self.sessions[device_address] = (SessionState.TERMINATING, future)
+
+ def _handle_session_disconnected(self, session):
self.logger.info('Session disconnected')
- self._terminate_session()
+ self._terminate_session(session)
- # Restart the session.
- self._start_session()
+ def _poll_attached_devices(self):
+ self.last_attached_poll_time = time.perf_counter()
- def _poll_attached_device(self):
- if not self.device:
+ for _ in range(self.poll_depth):
+ devices = self.devices.values()
+
+ if not devices:
+ break
+
+ poll_commands = [address_commands(device.device_address, Poll(device.get_poll_action())) for device in devices]
+
+ poll_responses = list(zip(devices, self.interface.execute(poll_commands, receive_timeout_is_error=False)))
+
+ # Handle POLL responses.
+ handleable_poll_responses = [pair for pair in poll_responses if pair[1] is not None and not isinstance(pair[1], ReceiveTimeout)]
+
+ if handleable_poll_responses:
+ poll_ack_commands = [address_commands(device.device_address, PollAck()) for (device, _) in handleable_poll_responses]
+
+ self.interface.execute(poll_ack_commands)
+
+ for (device, poll_response) in handleable_poll_responses:
+ self._handle_poll_response(device, poll_response)
+
+ # Handle lost devices.
+ for (device, poll_response) in poll_responses:
+ if isinstance(poll_response, ReceiveTimeout):
+ self._handle_device_lost(device)
+
+ if not handleable_poll_responses:
+ break
+
+ def _poll_next_detatched_device(self):
+ if self.last_detatched_poll_time is not None and (time.perf_counter() - self.last_detatched_poll_time) < self.detatched_poll_period:
return
- self.last_poll_time = time.perf_counter()
+ self.last_detatched_poll_time = time.perf_counter()
+
+ if not self.detatched_device_poll_queue:
+ self.detatched_device_poll_queue = list(self._get_detatched_device_addresses())
try:
- poll_response = self.device.poll()
+ device_address = self.detatched_device_poll_queue.pop(0)
+ except IndexError:
+ return
+
+ try:
+ poll_response = self.interface.execute(address_commands(device_address, Poll()))
except ReceiveTimeout:
- self._handle_device_lost()
+ return
+ except ReceiveError as error:
+ self.logger.warning(f'POLL detatched device @ {format_address(self.interface, device_address)} receive error: {error}')
+ return
+ except ProtocolError as error:
+ self.logger.warning(f'POLL detatched device @ {format_address(self.interface, device_address)} protocol error: {error}')
return
if poll_response:
- self._poll_ack(self.device.device_address)
-
- self._handle_poll_response(poll_response)
-
- self.last_poll_response = poll_response
-
- def _poll_detatched_device(self):
- if self.device:
- return
-
- self.last_poll_time = time.perf_counter()
-
- device_address = None
-
- try:
- poll_response = self._poll(device_address)
- except ReceiveTimeout:
- return
-
- if poll_response:
- self._poll_ack(device_address)
+ self.interface.execute(address_commands(device_address, PollAck()))
self._handle_device_found(device_address, poll_response)
- self.last_poll_response = poll_response
-
def _handle_device_found(self, device_address, poll_response):
self.logger.info(f'Found device @ {format_address(self.interface, device_address)}')
@@ -177,29 +280,31 @@ class Controller:
device.setup()
- self.device = device
+ self.devices[device_address] = device
self.logger.info(f'Attached device @ {format_address(self.interface, device_address)}')
- self._start_session()
-
- def _handle_device_lost(self):
- device_address = self.device.device_address
+ def _handle_device_lost(self, device):
+ device_address = device.device_address
self.logger.info(f'Lost device @ {format_address(self.interface, device_address)}')
- self._terminate_session()
+ if device_address in self.sessions:
+ (session_state, session) = self.sessions[device_address]
- self.device = None
+ if session_state == SessionState.ACTIVE:
+ self._terminate_session(session)
+
+ del self.devices[device_address]
self.logger.info(f'Detached device @ {format_address(self.interface, device_address)}')
- def _handle_poll_response(self, poll_response):
+ def _handle_poll_response(self, device, poll_response):
if isinstance(poll_response, KeystrokePollResponse):
- self._handle_keystroke_poll_response(poll_response)
+ self._handle_keystroke_poll_response(device, poll_response)
- def _handle_keystroke_poll_response(self, poll_response):
- terminal = self.device
+ def _handle_keystroke_poll_response(self, terminal, poll_response):
+ device_address = terminal.device_address
scan_code = poll_response.scan_code
(key, modifiers, modifiers_changed) = terminal.keyboard.get_key(scan_code)
@@ -221,27 +326,34 @@ class Controller:
terminal.display.toggle_cursor_reverse()
elif key == Key.CLICKER:
terminal.keyboard.toggle_clicker()
- elif self.session:
- self.session.handle_key(key, modifiers, scan_code)
+ elif device_address in self.sessions:
+ (session_state, session) = self.sessions[device_address]
- self.session.render()
+ if session_state == SessionState.ACTIVE:
+ session.handle_key(key, modifiers, scan_code)
- def _poll(self, device_address):
- return self.interface.execute(address_commands(device_address, Poll()))
+ session.render()
- def _poll_ack(self, device_address):
- self.interface.execute(address_commands(device_address, PollAck()))
-
- def _calculate_poll_delay(self, current_time):
- if self.last_poll_response is not None:
+ def _calculate_poll_delay(self):
+ if self.last_attached_poll_time is None:
return 0
- if self.last_poll_time is None:
- return 0
+ return max((self.last_attached_poll_time + self.attached_poll_period) - time.perf_counter(), 0)
- if self.device:
- period = self.attached_poll_period
+ def _get_detatched_device_addresses(self):
+ attached_addresses = set(self.devices.keys())
+
+ # The 3299 is transparent, but if there is at least one device attached to a 3299
+ # port then we can assume there is a 3299 attached and if there is one device
+ # direct attached then we can assume there is not a 3299 attached.
+ is_3299_attached = any(attached_addresses.difference([None]))
+ is_3299_not_attached = (None in attached_addresses)
+
+ if is_3299_not_attached or InterfaceFeature.PROTOCOL_3299 not in self.interface.features:
+ addresses = [None]
+ elif is_3299_attached:
+ addresses = PORT_MAP_3299
else:
- period = self.detatached_poll_period
+ addresses = [None, *PORT_MAP_3299]
- return max((self.last_poll_time + period) - current_time, 0)
+ return filter(lambda address: address not in attached_addresses, addresses)
diff --git a/oec/device.py b/oec/device.py
index cd3a437..79c944c 100644
--- a/oec/device.py
+++ b/oec/device.py
@@ -9,6 +9,7 @@ import logging
from more_itertools import chunked
from coax import read_feature_ids, parse_features, ReadTerminalId, ReadExtendedId, \
Feature, ProtocolError
+from coax.multiplexer import PORT_MAP_3299
logger = logging.getLogger(__name__)
@@ -23,8 +24,8 @@ class Device:
"""Setup the device."""
raise NotImplementedError
- def poll(self):
- """POLL the device."""
+ def get_poll_action(self):
+ """Get the POLL action."""
raise NotImplementedError
def execute(self, commands):
@@ -67,9 +68,12 @@ def address_commands(device_address, commands):
def format_address(interface, device_address):
"""Format a device address."""
if device_address is None:
- return interface.identifier
+ return f'{interface.identifier}#0'
- raise NotImplementedError
+ try:
+ return f'{interface.identifier}#{PORT_MAP_3299.index(device_address)}'
+ except ValueError:
+ return f'{interface.identifier}?{device_address:06b}'
def get_ids(interface, device_address, extended_id_retry_attempts=3):
terminal_id = None
@@ -78,7 +82,7 @@ def get_ids(interface, device_address, extended_id_retry_attempts=3):
try:
terminal_id = interface.execute(address_commands(device_address, ReadTerminalId()))
except ProtocolError as error:
- logger.warning(f'READ_TERMINAL_ID protocol error: {error}', exc_info=error)
+ logger.warning(f'READ_TERMINAL_ID protocol error: {error}')
# Retry the READ_EXTENDED_ID command as it appears to fail frequently on the
# first request - unlike the READ_TERMINAL_ID command,
@@ -90,9 +94,9 @@ def get_ids(interface, device_address, extended_id_retry_attempts=3):
break
except ProtocolError as error:
- logger.warning(f'READ_EXTENDED_ID protocol error: {error}', exc_info=error)
+ logger.warning(f'READ_EXTENDED_ID protocol error: {error}')
- time.sleep(0.25)
+ time.sleep(0.1)
return (terminal_id, extended_id.hex() if extended_id is not None else None)
diff --git a/oec/interface.py b/oec/interface.py
index ffdbe36..b9b9161 100644
--- a/oec/interface.py
+++ b/oec/interface.py
@@ -7,6 +7,8 @@ import os
import logging
from textwrap import dedent
+from coax import ReceiveTimeout
+
logger = logging.getLogger(__name__)
class AggregateExecuteError(Exception):
@@ -20,7 +22,7 @@ class InterfaceWrapper:
def __init__(self, interface):
self.interface = interface
- self.timeout = 0.1
+ self.timeout = 0.001
self.jumbo_write_strategy = _get_jumbo_write_strategy()
self.jumbo_write_max_length = None
@@ -37,13 +39,13 @@ class InterfaceWrapper:
return getattr(self.interface, attr)
- def execute(self, commands):
+ def execute(self, commands, receive_timeout_is_error=True):
if not isinstance(commands, list):
return self.interface.execute(commands, self.timeout)
responses = self.interface.execute(commands, self.timeout)
- errors = [response for response in responses if isinstance(response, BaseException)]
+ errors = [response for response in responses if isinstance(response, BaseException) and (receive_timeout_is_error or not isinstance(response, ReceiveTimeout))]
if any(errors):
raise AggregateExecuteError(errors, responses)
diff --git a/oec/session.py b/oec/session.py
index fe00501..a876f32 100644
--- a/oec/session.py
+++ b/oec/session.py
@@ -1,4 +1,7 @@
class Session:
+ def __init__(self, terminal):
+ self.terminal = terminal
+
def start(self):
raise NotImplementedError
diff --git a/oec/terminal.py b/oec/terminal.py
index 1928944..b9c540a 100644
--- a/oec/terminal.py
+++ b/oec/terminal.py
@@ -3,7 +3,7 @@ oec.terminal
~~~~~~~~~~~~
"""
-from coax import Poll, LoadControlRegister, Feature, PollAction, Control
+from coax import LoadControlRegister, Feature, PollAction, Control
from .device import Device, UnsupportedDeviceError
from .display import Dimensions, BufferedDisplay
@@ -53,30 +53,26 @@ class Terminal(Device):
# Show the attached indicator on the status line.
self.display.status_line.write_string(0, 'OEC')
- def poll(self):
- """Execute a POLL command with queued actions."""
+ self.display.move_cursor(row=0, column=0)
+
+ def get_poll_action(self):
+ """Get the POLL action."""
poll_action = PollAction.NONE
# Convert a queued alarm or keyboard clicker change to POLL action.
if self.alarm:
poll_action = PollAction.ALARM
+
+ self.alarm = False
elif self.keyboard.clicker != self.last_poll_keyboard_clicker:
if self.keyboard.clicker:
poll_action = PollAction.ENABLE_KEYBOARD_CLICKER
else:
poll_action = PollAction.DISABLE_KEYBOARD_CLICKER
- poll_response = self.execute(Poll(poll_action))
-
- # Clear the queued alarm and keyboard clicker change if the POLL was
- # successful.
- if poll_action == PollAction.ALARM:
- self.alarm = False
- elif poll_action in [PollAction.ENABLE_KEYBOARD_CLICKER,
- PollAction.DISABLE_KEYBOARD_CLICKER]:
self.last_poll_keyboard_clicker = self.keyboard.clicker
- return poll_response
+ return poll_action
def sound_alarm(self):
"""Queue an alarm on next POLL command."""
diff --git a/oec/tn3270.py b/oec/tn3270.py
index 4fc8ccc..ee6116f 100644
--- a/oec/tn3270.py
+++ b/oec/tn3270.py
@@ -48,9 +48,10 @@ class TN3270Session(Session):
"""TN3270 session."""
def __init__(self, terminal, host, port):
+ super().__init__(terminal)
+
self.logger = logging.getLogger(__name__)
- self.terminal = terminal
self.host = host
self.port = port
diff --git a/oec/vt100.py b/oec/vt100.py
index 6c93ebc..01c577e 100644
--- a/oec/vt100.py
+++ b/oec/vt100.py
@@ -71,9 +71,10 @@ class VT100Session(Session):
"""VT100 session."""
def __init__(self, terminal, host_command):
+ super().__init__(terminal)
+
self.logger = logging.getLogger(__name__)
- self.terminal = terminal
self.host_command = host_command
self.host_process = None
@@ -94,19 +95,11 @@ class VT100Session(Session):
self.vt100_stream = pyte.ByteStream(self.vt100_screen)
+ self.is_first_render = True
+
def start(self):
- # Start the host process.
self._start_host_process()
- # Clear the screen.
- self.terminal.display.clear()
-
- # Update the status line.
- self.terminal.display.status_line.write_string(45, 'VT100')
-
- # Reset the cursor.
- self.terminal.display.move_cursor(row=0, column=0)
-
def terminate(self):
if self.host_process:
self._terminate_host_process()
@@ -137,6 +130,11 @@ class VT100Session(Session):
self.host_process.write(bytes_)
def render(self):
+ if self.is_first_render:
+ self.terminal.display.status_line.write_string(45, 'VT100')
+
+ self.is_first_render = False
+
self._apply()
self._flush()
diff --git a/requirements.txt b/requirements.txt
index 8f29b43..550f33c 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,6 +1,6 @@
more-itertools==8.10.0
ptyprocess==0.7.0
-pycoax==0.10.0
+pycoax==0.11.0
pyserial==3.5
pyte==0.8.0
pytn3270==0.12.0
diff --git a/tests/mock_interface.py b/tests/mock_interface.py
index 178e9f8..35470dc 100644
--- a/tests/mock_interface.py
+++ b/tests/mock_interface.py
@@ -1,10 +1,12 @@
-from unittest.mock import Mock
+from unittest.mock import Mock, ANY
from coax import ProtocolError, ReceiveError, ReceiveTimeout
from coax.interface import Interface
class MockInterface(Interface):
def __init__(self, responses=[]):
+ super().__init__()
+
self.mock_responses = responses
self.serial = Mock(port='/dev/mock')
@@ -40,7 +42,7 @@ class MockInterface(Interface):
for command in call[0][0]:
(call_device_address, call_command) = command
- if call_device_address == device_address and isinstance(call_command, command_type):
+ if (device_address == ANY or call_device_address == device_address) and isinstance(call_command, command_type):
if predicate is None or predicate(call_command):
commands.append(command)
diff --git a/tests/test_controller.py b/tests/test_controller.py
index dcef935..776b602 100644
--- a/tests/test_controller.py
+++ b/tests/test_controller.py
@@ -1,265 +1,344 @@
import unittest
-from unittest.mock import Mock, create_autospec, patch
+from unittest.mock import Mock, create_autospec, patch, call, ANY
import selectors
from selectors import BaseSelector
-from logging import Logger
-from coax import Poll, PowerOnResetCompletePollResponse, KeystrokePollResponse, PollAck, ReceiveTimeout
-from coax.protocol import TerminalId
+from concurrent.futures import Future
+from coax import Poll, PollAck, PowerOnResetCompletePollResponse, KeystrokePollResponse, ReceiveTimeout
import context
from oec.interface import InterfaceWrapper
-from oec.controller import Controller
-from oec.device import UnsupportedDeviceError
+from oec.controller import Controller, SessionState
from oec.terminal import Terminal
-from oec.keyboard import KeyboardModifiers, Key
-from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2
from oec.session import Session, SessionDisconnectedError
from mock_interface import MockInterface
-class RunLoopTestCase(unittest.TestCase):
+class UpdateSessionsTestCase(unittest.TestCase):
def setUp(self):
self.interface = MockInterface()
- self.terminal = Terminal(self.interface, None, TerminalId(0b11110100), 'c1348300', { }, KEYMAP_3278_2)
-
- self.terminal.setup = Mock(Terminal, instance=True)
-
- self.terminal.display.write = Mock()
- self.terminal.display.toggle_cursor_blink = Mock()
- self.terminal.display.toggle_cursor_reverse = Mock()
-
- self.terminal.keyboard.toggle_clicker = Mock()
-
- self.create_device = Mock(return_value=self.terminal)
-
- self.session = create_autospec(Session, instance=True)
- self.create_session = Mock(return_value=self.session)
-
- self.controller = Controller(InterfaceWrapper(self.interface), self.create_device, self.create_session)
-
- self.controller.logger = create_autospec(Logger, instance=True)
-
- self.controller.attached_poll_period = 1
+ self.controller = Controller(InterfaceWrapper(self.interface), None, None)
self.controller.session_selector = create_autospec(BaseSelector, instance=True)
- self.controller._update_session = Mock(wraps=self.controller._update_session)
-
patcher = patch('oec.controller.time.perf_counter')
self.perf_counter = patcher.start()
- patcher = patch('oec.controller.time.sleep')
-
- self.sleep = patcher.start()
-
self.addCleanup(patch.stopall)
- def test_no_device(self):
- self._assert_run_loop(0, ReceiveTimeout, False, 0, False)
- self._assert_run_loop(1, ReceiveTimeout, False, 4, False)
-
- self.assertIsNone(self.controller.device)
- self.assertIsNone(self.controller.session)
-
- def test_device_attached(self):
- self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
- self._assert_run_loop(0, None, False, 0, False)
- self._assert_run_loop(0.5, None, True, 0.5, False)
-
- self.assertIsNotNone(self.controller.device)
- self.assertIsNotNone(self.controller.session)
-
- self.controller._update_session.assert_called()
-
- def test_unsupported_device_attached(self):
- self.create_device.side_effect = [UnsupportedDeviceError]
-
- self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
-
- self.assertIsNone(self.controller.device)
- self.assertIsNone(self.controller.session)
-
- def test_keystroke(self):
- self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
- self._assert_run_loop(0, KeystrokePollResponse(0b0110000010), False, 0, True)
- self._assert_run_loop(0, None, False, 0, False)
- self._assert_run_loop(0.5, None, True, 0.5, False)
-
- self.assertIsNotNone(self.controller.device)
- self.assertIsNotNone(self.controller.session)
-
- self.controller.session.handle_key.assert_called_with(Key.LOWER_A, KeyboardModifiers.NONE, 96)
-
- def test_device_detatched(self):
- self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
- self._assert_run_loop(0, None, False, 0, False)
- self._assert_run_loop(0.5, ReceiveTimeout, True, 0.5, False)
-
- self.assertIsNone(self.controller.device)
- self.assertIsNone(self.controller.session)
-
- self.session.terminate.assert_called()
-
- def test_session_disconnected(self):
- selector_key = Mock(fileobj=self.session)
-
- self.controller.session_selector.select.return_value = [(selector_key, selectors.EVENT_READ)]
-
- self.session.handle_host = Mock(side_effect=[None, SessionDisconnectedError])
-
- self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
- self._assert_run_loop(0, None, False, 0, False)
- self._assert_run_loop(0.5, None, True, 0.5, False)
- self._assert_run_loop(1.5, None, True, 3, False)
-
- self.assertIsNotNone(self.controller.device)
- self.assertIsNotNone(self.controller.session)
-
- self.assertEqual(self.create_session.call_count, 2)
-
- def test_toggle_cursor_blink(self):
- self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
-
- self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True)
-
- self.terminal.display.toggle_cursor_blink.assert_called_once()
-
- self.terminal.display.toggle_cursor_blink.reset_mock()
-
- self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True)
-
- self.terminal.display.toggle_cursor_blink.assert_called_once()
-
- def test_toggle_cursor_reverse(self):
- self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
-
- self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True)
- self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True)
- self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True)
-
- self.terminal.display.toggle_cursor_reverse.assert_called_once()
-
- self.terminal.display.toggle_cursor_reverse.reset_mock()
-
- self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True)
- self._assert_run_loop(0, KeystrokePollResponse(0b0101010010), False, 0, True)
- self._assert_run_loop(0, KeystrokePollResponse(0b0100111110), False, 0, True)
-
- self.terminal.display.toggle_cursor_reverse.assert_called_once()
-
- def test_toggle_clicker(self):
- self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), False, 0, True)
-
- self._assert_run_loop(0, KeystrokePollResponse(0b0101011110), False, 0, True)
- self._assert_run_loop(0, None, False, 0, False)
-
- self.terminal.keyboard.toggle_clicker.assert_called_once()
-
- self.terminal.keyboard.toggle_clicker.reset_mock()
-
- self._assert_run_loop(0.5, KeystrokePollResponse(0b0101011110), True, 0.5, True)
- self._assert_run_loop(1, None, False, 0, False)
-
- self.terminal.keyboard.toggle_clicker.assert_called_once()
-
- def _assert_run_loop(self, poll_time, poll_response, expected_update_session, expected_poll_delay, expected_poll_ack):
+ def test_no_sessions(self):
# Arrange
- self.controller._update_session.reset_mock()
- self.interface.mock_responses = [(None, Poll, None, poll_response)]
+ self.perf_counter.side_effect = [0, 0.1, 0.2]
- self.interface.reset_mock()
-
- def perf_counter():
- nonlocal poll_time
-
- time = poll_time
-
- poll_time += expected_update_session
-
- return time
-
- self.perf_counter.side_effect = perf_counter
-
- self.sleep.reset_mock()
+ self.controller.session_selector.select.return_value = []
# Act
- self.controller._run_loop()
+ self.assertFalse(self.controller._update_sessions(1.0))
+
+ def test_missing_sessions_are_started(self):
+ # Arrange
+ device = create_autospec(Terminal, instance=True)
+
+ self.controller.devices[None] = device
+
+ self.controller._start_session = Mock()
+
+ self.perf_counter.side_effect = [0, 0.1, 0.2]
+
+ self.controller.session_selector.select.return_value = []
+
+ # Act
+ self.controller._update_sessions(1.0)
# Assert
- if expected_update_session:
- self.controller._update_session.assert_called_once_with(expected_poll_delay)
- self.sleep.assert_not_called()
- else:
- self.controller._update_session.assert_not_called()
+ self.controller._start_session.assert_called_once_with(device)
- if expected_poll_delay > 0:
- self.sleep.assert_called_once_with(expected_poll_delay)
- else:
- self.sleep.assert_not_called()
+ def test_started_sessions_are_activated(self):
+ # Arrange
+ device = create_autospec(Terminal, instance=True)
- if expected_poll_ack:
- self.interface.assert_command_executed(None, PollAck)
- else:
- self.interface.assert_command_not_executed(None, PollAck)
+ session = create_autospec(Session, instance=True)
-class UpdateSessionTestCase(unittest.TestCase):
+ future = create_autospec(Future, instance=True)
+
+ future.done = Mock(return_value=True)
+ future.result = Mock(return_value=session)
+
+ self.controller.devices[None] = device
+ self.controller.sessions[None] = (SessionState.STARTING, future)
+
+ self.controller.session_selector.select.return_value = []
+
+ self.perf_counter.side_effect = [0, 0.1, 0.2]
+
+ # Act
+ self.controller._update_sessions(1.0)
+
+ # Assert
+ self.assertEqual(self.controller.sessions, { None: (SessionState.ACTIVE, session) })
+
+ self.controller.session_selector.register.assert_called_once_with(session, selectors.EVENT_READ)
+
+ def test_terminated_sessions_are_removed(self):
+ # Arrange
+ device = create_autospec(Terminal, instance=True)
+
+ future = create_autospec(Future, instance=True)
+
+ future.done = Mock(return_value=True)
+ future.result = Mock()
+
+ self.controller.devices[None] = device
+ self.controller.sessions[None] = (SessionState.TERMINATING, future)
+
+ self.controller.session_selector.select.return_value = []
+
+ self.perf_counter.side_effect = [0, 0.1, 0.2]
+
+ # Act
+ self.controller._update_sessions(1.0)
+
+ # Assert
+ self.assertEqual(self.controller.sessions, { })
+
+ def test_active_sessions_select_timeout(self):
+ # Arrange
+ device = create_autospec(Terminal, instance=True)
+
+ session = create_autospec(Session, instance=True)
+
+ self.controller.devices[None] = device
+ self.controller.sessions[None] = (SessionState.ACTIVE, session)
+
+ self.controller.session_selector.select.return_value = []
+
+ self.perf_counter.side_effect = [0, 0.1, 0.2]
+
+ # Act
+ self.controller._update_sessions(1.0)
+
+ # Assert
+ self.controller.session_selector.select.assert_called_once_with(0.9)
+
+ session.handle_host.assert_not_called()
+ session.render.assert_not_called()
+
+ def test_active_sessions_select_available(self):
+ # Arrange
+ device = create_autospec(Terminal, instance=True)
+
+ session = create_autospec(Session, instance=True)
+
+ self.controller.devices[None] = device
+ self.controller.sessions[None] = (SessionState.ACTIVE, session)
+
+ selector_key = Mock(fileobj=session)
+
+ self.controller.session_selector.select.side_effect = [[(selector_key, selectors.EVENT_READ)], []]
+
+ self.perf_counter.side_effect = [0, 0.1, 0.2, 0.3, 0.4, 0.4]
+
+ # Act
+ self.controller._update_sessions(1.0)
+
+ # Assert
+ self.controller.session_selector.select.assert_has_calls([call(0.9), call(0.8)])
+
+ session.handle_host.assert_called_once()
+ session.render.assert_called_once()
+
+ def test_active_sessions_disconnected(self):
+ # Arrange
+ device = create_autospec(Terminal, instance=True)
+
+ session = create_autospec(Session, instance=True)
+
+ session.handle_host.side_effect = SessionDisconnectedError
+
+ self.controller.devices[None] = device
+ self.controller.sessions[None] = (SessionState.ACTIVE, session)
+
+ self.controller._terminate_session = Mock()
+
+ selector_key = Mock(fileobj=session)
+
+ self.controller.session_selector.select.side_effect = [[(selector_key, selectors.EVENT_READ)], []]
+
+ self.perf_counter.side_effect = [0, 0.1, 0.2, 0.3, 0.4, 0.4]
+
+ # Act
+ self.controller._update_sessions(1.0)
+
+ # Assert
+ self.controller.session_selector.select.assert_has_calls([call(0.9), call(0.8)])
+
+ self.controller._terminate_session.assert_called_once_with(session)
+
+ session.render.assert_not_called()
+
+class PollAttachedDevicesTestCase(unittest.TestCase):
def setUp(self):
- self.controller = Controller(None, None, None)
+ self.interface = MockInterface()
- self.controller.session = create_autospec(Session, instance=True)
+ self.controller = Controller(InterfaceWrapper(self.interface), None, None)
- self.controller.session_selector = create_autospec(BaseSelector, instance=True)
+ self.controller._handle_poll_response = Mock(wraps=self.controller._handle_poll_response)
+
+ def test_no_attached_devices(self):
+ self.controller._poll_attached_devices()
+
+ # Assert
+ self.interface.assert_command_not_executed(ANY, Poll)
+ self.controller._handle_poll_response.assert_not_called()
+
+ def test_tt_ar(self):
+ # Arrange
+ device = create_autospec(Terminal, instance=True, device_address=None)
+
+ self.controller.devices[None] = device
+
+ # Act
+ self.controller._poll_attached_devices()
+
+ # Assert
+ self.interface.assert_command_executed(None, Poll)
+ self.interface.assert_command_not_executed(None, PollAck)
+ self.controller._handle_poll_response.assert_not_called()
+
+ def test_receive_timeout(self):
+ # Arrange
+ self.interface.mock_responses = [(None, Poll, None, ReceiveTimeout)]
+
+ device = create_autospec(Terminal, instance=True, device_address=None)
+
+ self.controller.devices[None] = device
+
+ self.controller._handle_device_lost = Mock()
+
+ # Act
+ self.controller._poll_attached_devices()
+
+ # Assert
+ self.controller._handle_device_lost.assert_called_once_with(device)
+
+ self.interface.assert_command_executed(None, Poll)
+ self.interface.assert_command_not_executed(None, PollAck)
+ self.controller._handle_poll_response.assert_not_called()
+
+ def test_keystroke(self):
+ # Arrange
+ poll_response = KeystrokePollResponse(0b0110000010)
+
+ poll = Mock(side_effect=[poll_response, None, None])
+
+ self.interface.mock_responses = [(None, Poll, None, poll)]
+
+ device = create_autospec(Terminal, instance=True, device_address=None)
+
+ self.controller.devices[None] = device
+
+ self.controller._handle_keystroke_poll_response = Mock()
+
+ # Act
+ self.controller._poll_attached_devices()
+
+ # Assert
+ self.controller._handle_keystroke_poll_response.assert_called_once_with(device, poll_response)
+
+ self.assertEqual(poll.call_count, 2)
+ self.interface.assert_command_executed(None, PollAck)
+
+class PollNextDetatchedDeviceTestCase(unittest.TestCase):
+ def setUp(self):
+ self.interface = MockInterface()
+
+ self.interface.mock_responses = [(None, Poll, None, ReceiveTimeout)]
+
+ self.controller = Controller(InterfaceWrapper(self.interface), None, None)
patcher = patch('oec.controller.time.perf_counter')
self.perf_counter = patcher.start()
- def test_zero_duration(self):
- # Act
- self.controller._update_session(0)
+ self.addCleanup(patch.stopall)
- # Assert
- self.controller.session.handle_host.assert_not_called()
- self.controller.session.render.assert_not_called()
-
- self.controller.session_selector.select.assert_not_called()
-
- def test_select_timeout(self):
+ def test_poll_period_not_expired(self):
# Arrange
- self.controller.session_selector.select.return_value = []
+ self.controller.detatched_poll_period = 0.5
+ self.controller.last_detatched_poll_time = 1.0
+
+ self.perf_counter.return_value = 1.1
# Act
- self.controller._update_session(1)
+ self.controller._poll_next_detatched_device()
# Assert
- self.controller.session.handle_host.assert_not_called()
- self.controller.session.render.assert_not_called()
+ self.interface.assert_command_not_executed(None, Poll)
- self.controller.session_selector.select.assert_called_once()
+ self.assertEqual(self.controller.last_detatched_poll_time, 1.0)
- def test_select_available(self):
+ def test_empty_queue_that_remains_empty(self):
# Arrange
- self.perf_counter.side_effect = [0, 0.75, 0.75]
-
- selector_key = Mock(fileobj=self.controller.session)
-
- self.controller.session_selector.select.side_effect = [[(selector_key, selectors.EVENT_READ)], []]
+ self.controller._get_detatched_device_addresses = Mock(return_value=[])
# Act
- self.controller._update_session(1)
+ self.controller._poll_next_detatched_device()
# Assert
- self.controller.session.handle_host.assert_called_once()
- self.controller.session.render.assert_called_once()
+ self.interface.assert_command_not_executed(None, Poll)
- self.assertEqual(self.controller.session_selector.select.call_count, 2)
+ self.controller._get_detatched_device_addresses.assert_called_once()
- call_args_list = self.controller.session_selector.select.call_args_list
+ def test_empty_queue_that_is_populated(self):
+ # Arrange
+ self.controller._get_detatched_device_addresses = Mock(return_value=[None])
- self.assertEqual(call_args_list[0][0][0], 1)
- self.assertEqual(call_args_list[1][0][0], 0.25)
+ # Act
+ self.controller._poll_next_detatched_device()
+
+ # Assert
+ self.interface.assert_command_executed(None, Poll)
+
+ self.controller._get_detatched_device_addresses.assert_called_once()
+
+ def test_non_empty_queue(self):
+ # Arrange
+ self.interface.mock_responses = [(0b000000, Poll, None, ReceiveTimeout)]
+
+ self.controller.detatched_device_poll_queue = [0b000000, 0b100000]
+
+ self.controller._get_detatched_device_addresses = Mock()
+
+ # Act
+ self.controller._poll_next_detatched_device()
+
+ # Assert
+ self.interface.assert_command_executed(0b000000, Poll)
+
+ self.assertEqual(self.controller.detatched_device_poll_queue, [0b100000])
+
+ self.controller._get_detatched_device_addresses.assert_not_called()
+
+ def test_device_found(self):
+ # Arrange
+ self.controller.detatched_device_poll_queue = [None]
+
+ poll_response = PowerOnResetCompletePollResponse(0xa)
+
+ poll = Mock(side_effect=[poll_response, None, None])
+
+ self.interface.mock_responses = [(None, Poll, None, poll)]
+
+ self.controller._handle_device_found = Mock()
+
+ # Act
+ self.controller._poll_next_detatched_device()
+
+ # Assert
+ self.controller._handle_device_found.assert_called_once_with(None, poll_response)
+
+ self.interface.assert_command_executed(None, PollAck)
diff --git a/tests/test_device.py b/tests/test_device.py
index 74f4453..7bc7964 100644
--- a/tests/test_device.py
+++ b/tests/test_device.py
@@ -38,11 +38,13 @@ class FormatAddressTestCase(unittest.TestCase):
self.interface = MockInterface()
def test_no_port(self):
- self.assertEqual(format_address(InterfaceWrapper(self.interface), None), '/dev/mock')
+ self.assertEqual(format_address(InterfaceWrapper(self.interface), None), '/dev/mock#0')
- def test_multiplexer(self):
- with self.assertRaises(NotImplementedError):
- format_address(InterfaceWrapper(self.interface), 0b110000)
+ def test_known_multiplexer_port(self):
+ self.assertEqual(format_address(InterfaceWrapper(self.interface), 0b110000), '/dev/mock#3')
+
+ def test_unknown_multiplexer_port(self):
+ self.assertEqual(format_address(InterfaceWrapper(self.interface), 0b111111), '/dev/mock?111111')
class GetIdsTestCase(unittest.TestCase):
def setUp(self):
diff --git a/tests/test_terminal.py b/tests/test_terminal.py
index dce62ca..fe5cbc8 100644
--- a/tests/test_terminal.py
+++ b/tests/test_terminal.py
@@ -1,6 +1,6 @@
import unittest
from unittest.mock import create_autospec
-from coax import Poll, PollAction
+from coax import PollAction
from coax.protocol import TerminalId
import context
@@ -43,7 +43,7 @@ class TerminalSetupTestCase(unittest.TestCase):
def test(self):
self.terminal.setup()
-class TerminalPollTestCase(unittest.TestCase):
+class TerminalGetPollActionTestCase(unittest.TestCase):
def setUp(self):
self.interface = MockInterface()
@@ -54,26 +54,17 @@ class TerminalPollTestCase(unittest.TestCase):
# The terminal will be initialized in a state where the terminal keyboard clicker
# state is unknown, and this cannot be read. Therefore the first POLL will always
# attempt to set the keyboard clicker state...
- self.terminal.poll()
-
- self.interface.reset_mock()
+ self.terminal.get_poll_action()
def test_with_no_queued_actions(self):
- # Act
- self.terminal.poll()
-
- # Assert
- self.assert_poll_with_poll_action(PollAction.NONE)
+ self.assertEqual(self.terminal.get_poll_action(), PollAction.NONE)
def test_with_sound_alarm_queued(self):
# Arrange
self.terminal.sound_alarm()
- # Act
- self.terminal.poll()
-
- # Assert
- self.assert_poll_with_poll_action(PollAction.ALARM)
+ # Act and assert
+ self.assertEqual(self.terminal.get_poll_action(), PollAction.ALARM)
def test_with_enable_keyboard_clicker_queued(self):
# Arrange
@@ -81,14 +72,8 @@ class TerminalPollTestCase(unittest.TestCase):
self.terminal.keyboard.toggle_clicker()
- # Act
- self.terminal.poll()
-
- # Assert
- self.assert_poll_with_poll_action(PollAction.ENABLE_KEYBOARD_CLICKER)
-
- def assert_poll_with_poll_action(self, action):
- self.interface.assert_command_executed(None, Poll, lambda command: command.action == action)
+ # Act and assert
+ self.assertEqual(self.terminal.get_poll_action(), PollAction.ENABLE_KEYBOARD_CLICKER)
def _create_terminal(interface):
terminal_id = TerminalId(0b11110100)
diff --git a/tests/test_vt100.py b/tests/test_vt100.py
index 1a74789..3016b12 100644
--- a/tests/test_vt100.py
+++ b/tests/test_vt100.py
@@ -131,6 +131,8 @@ class SessionRenderTestCase(unittest.TestCase):
def test_with_no_eab_feature(self):
# Arrange
+ self.session.is_first_render = False
+
self.session.host_process.read = Mock(return_value=b'abc')
self.session.handle_host()
@@ -157,6 +159,8 @@ class SessionRenderTestCase(unittest.TestCase):
self.terminal.display.move_cursor = Mock(wraps=self.terminal.display.move_cursor)
self.terminal.display.flush = Mock(wraps=self.terminal.display.flush)
+ self.session.is_first_render = False
+
self.session.host_process.read = Mock(return_value=b'abc')
self.session.handle_host()