Controller loop unit tests

This commit is contained in:
Andrew Kay 2019-12-10 21:52:52 -06:00
parent d510f077e3
commit 7197a6ffc4
3 changed files with 158 additions and 33 deletions

View File

@ -43,31 +43,7 @@ class Controller:
self.running = True
while self.running:
if self.session:
try:
self.session.handle_host()
except SessionDisconnectedError:
self._handle_session_disconnected()
try:
poll_response = self._poll()
except ReceiveTimeout:
if self.terminal:
self._handle_terminal_detached()
continue
except ReceiveError as error:
self.logger.warning(f'POLL receive error: {error}', exc_info=error)
continue
except ProtocolError as error:
self.logger.warning(f'POLL protocol error: {error}', exc_info=error)
continue
if not self.terminal:
self._handle_terminal_attached(poll_response)
if poll_response:
self._handle_poll_response(poll_response)
self._run_loop()
self._terminate_session()
@ -77,6 +53,33 @@ class Controller:
def stop(self):
self.running = False
def _run_loop(self):
if self.session:
try:
self.session.handle_host()
except SessionDisconnectedError:
self._handle_session_disconnected()
try:
poll_response = self._poll()
except ReceiveTimeout:
if self.terminal:
self._handle_terminal_detached()
return
except ReceiveError as error:
self.logger.warning(f'POLL receive error: {error}', exc_info=error)
return
except ProtocolError as error:
self.logger.warning(f'POLL protocol error: {error}', exc_info=error)
return
if not self.terminal:
self._handle_terminal_attached(poll_response)
if poll_response:
self._handle_poll_response(poll_response)
def _handle_terminal_attached(self, poll_response):
self.logger.info('Terminal attached')
@ -156,7 +159,7 @@ class Controller:
self.session.handle_key(key, modifiers, scan_code)
def _poll(self):
delay = self._calculate_poll_delay()
delay = self._calculate_poll_delay(time.perf_counter())
if delay > 0:
time.sleep(delay)
@ -177,7 +180,7 @@ class Controller:
return poll_response
def _calculate_poll_delay(self):
def _calculate_poll_delay(self, current_time):
if self.last_poll_response is not None:
return 0
@ -189,4 +192,4 @@ class Controller:
else:
period = self.disconnected_poll_period
return (self.last_poll_time + period) - time.perf_counter()
return (self.last_poll_time + period) - current_time

122
tests/test_controller.py Normal file
View File

@ -0,0 +1,122 @@
import unittest
from unittest.mock import Mock, PropertyMock, patch
from coax import PowerOnResetCompletePollResponse, KeystrokePollResponse, ReceiveTimeout
from coax.protocol import TerminalId
import context
from oec.controller import Controller
from oec.session import SessionDisconnectedError
from oec.keyboard import KeyboardModifiers, Key
from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2
TERMINAL_IDS = (TerminalId(0b11110100), 'c1348300')
class RunLoopTestCase(unittest.TestCase):
def setUp(self):
self.interface = Mock()
self.session_mock = Mock()
self.create_session_mock = Mock(return_value=self.session_mock)
self.controller = Controller(self.interface, lambda terminal_id, extended_id: KEYMAP_3278_2, self.create_session_mock)
self.controller.connected_poll_period = 1
patcher = patch('oec.controller.poll')
self.poll_mock = patcher.start()
patcher = patch('oec.controller.poll_ack')
self.poll_ack_mock = patcher.start()
patcher = patch('oec.controller.read_terminal_ids')
self.read_terminal_ids_mock = patcher.start()
self.read_terminal_ids_mock.return_value = TERMINAL_IDS
patcher = patch('oec.controller.time.perf_counter')
self.perf_counter_mock = patcher.start()
patcher = patch('oec.controller.time.sleep')
self.sleep_mock = patcher.start()
self.addCleanup(patch.stopall)
def test_no_terminal(self):
self._assert_run_loop(0, ReceiveTimeout, 0, False)
self._assert_run_loop(1, ReceiveTimeout, 4, False)
self.assertIsNone(self.controller.terminal)
self.assertIsNone(self.controller.session)
def test_terminal_attached(self):
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), 0, True)
self._assert_run_loop(0, None, 0, False)
self._assert_run_loop(0.5, None, 0.5, False)
self.assertIsNotNone(self.controller.terminal)
self.assertIsNotNone(self.controller.session)
self.controller.session.handle_host.assert_called()
def test_keystroke(self):
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), 0, True)
self._assert_run_loop(0, KeystrokePollResponse(0b0110000010), 0, True)
self._assert_run_loop(0, None, 0, False)
self.assertIsNotNone(self.controller.terminal)
self.assertIsNotNone(self.controller.session)
self.controller.session.handle_key.assert_called_with(Key.LOWER_A, KeyboardModifiers.NONE, 96)
def test_terminal_detached(self):
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), 0, True)
self._assert_run_loop(0, None, 0, False)
self._assert_run_loop(0.5, ReceiveTimeout, 0.5, False)
self.assertIsNone(self.controller.terminal)
self.assertIsNone(self.controller.session)
self.session_mock.terminate.assert_called()
def test_session_disconnected(self):
self.session_mock.handle_host.side_effect = [None, SessionDisconnectedError, None]
self._assert_run_loop(0, PowerOnResetCompletePollResponse(0xa), 0, True)
self._assert_run_loop(0, None, 0, False)
self._assert_run_loop(0.5, None, 0.5, False)
self._assert_run_loop(1.5, None, 0.5, False)
self.assertIsNotNone(self.controller.terminal)
self.assertIsNotNone(self.controller.session)
self.assertEqual(self.create_session_mock.call_count, 2)
def _assert_run_loop(self, poll_time, poll_response, expected_delay, expected_poll_ack):
# Arrange
self.poll_mock.side_effect = [poll_response]
self.poll_ack_mock.reset_mock()
self.perf_counter_mock.side_effect = [poll_time, poll_time + expected_delay]
self.sleep_mock.reset_mock()
# Act
self.controller._run_loop()
# Assert
if expected_delay > 0:
self.sleep_mock.assert_called_once_with(expected_delay)
else:
self.sleep_mock.assert_not_called()
if expected_poll_ack:
self.poll_ack_mock.assert_called_once()
else:
self.poll_ack_mock.assert_not_called()

View File

@ -8,7 +8,8 @@ from oec.keyboard import Key, KeyboardModifiers
from oec.vt100 import VT100Session, select
class SessionHandleHostTestCase(unittest.TestCase):
def test(self):
@patch('oec.vt100.select')
def test(self, select_mock):
# Arrange
terminal = Mock()
@ -20,11 +21,10 @@ class SessionHandleHostTestCase(unittest.TestCase):
session.host_process.read = Mock(return_value=b'abc')
# Act
with patch('oec.vt100.select') as select_patch:
select_patch.return_value = [[session.host_process]]
select_mock.return_value = [[session.host_process]]
session.handle_host()
# Act
session.handle_host()
# Assert
terminal.display.buffered_write.assert_any_call(0x80, row=0, column=0)