diff --git a/oec/__main__.py b/oec/__main__.py index 3aeba4c..69566a2 100644 --- a/oec/__main__.py +++ b/oec/__main__.py @@ -3,7 +3,7 @@ import signal import logging import argparse from serial import Serial -from coax import Interface1 +from coax import SerialInterface from .controller import Controller from .tn3270 import TN3270Session @@ -81,7 +81,7 @@ def main(): time.sleep(3) # Initialize the interface. - interface = Interface1(serial) + interface = SerialInterface(serial) firmware_version = interface.reset() diff --git a/oec/controller.py b/oec/controller.py index f922bb0..659436c 100644 --- a/oec/controller.py +++ b/oec/controller.py @@ -179,7 +179,7 @@ class Controller: poll_action = self.terminal.get_poll_action() if self.terminal else PollAction.NONE - poll_response = poll(self.interface, poll_action, timeout=1) + poll_response = poll(self.interface, poll_action, receive_timeout=1) if poll_response: try: diff --git a/oec/display.py b/oec/display.py index c66e808..7741541 100644 --- a/oec/display.py +++ b/oec/display.py @@ -6,7 +6,8 @@ oec.display from collections import namedtuple import logging from sortedcontainers import SortedSet -from coax import load_address_counter_hi, load_address_counter_lo +from coax import read_address_counter_hi, read_address_counter_lo, \ + load_address_counter_hi, load_address_counter_lo, write_data _ASCII_CHAR_MAP = { '>': 0x08, @@ -255,6 +256,9 @@ class Display: raise ValueError('Either index or row and column is required') def _calculate_address_after_write(self, address, count): + if address is None: + return None + address += count (rows, columns) = self.dimensions @@ -265,6 +269,12 @@ class Display: return address + def _read_address_counter(self): + hi = read_address_counter_hi(self.interface) + lo = read_address_counter_lo(self.interface) + + return (hi << 8) | lo + def _load_address_counter(self, address, force_load): if address == self.address_counter and not force_load: return False @@ -298,29 +308,37 @@ class Display: address = self._calculate_address(start_index) try: - self._write(data, address=address if address != self.address_counter else None) + self._write(data, address=address) except Exception as error: # TODO: This could leave the address_counter incorrect. self.logger.error(f'Write error: {error}', exc_info=error) - self.address_counter = self._calculate_address_after_write(address, len(data)) - for index in range(start_index, end_index + 1): self.dirty.discard(index) return self.address_counter def _write(self, data, address=None, restore_original_address=False): - if isinstance(data, tuple): - offload_data = data[0] - offload_repeat = max(data[1] - 1, 0) - else: - offload_data = data - offload_repeat = 0 + if restore_original_address: + original_address = self.address_counter - self.interface.offload_write(offload_data, address=address, - restore_original_address=restore_original_address, - repeat=offload_repeat) + if original_address is None: + original_address = self._read_address_counter() + + if address is not None: + self._load_address_counter(address, force_load=False) + + write_data(self.interface, data) + + if isinstance(address, tuple): + length = len(data[0]) * data[1] + else: + length = len(data) + + self.address_counter = self._calculate_address_after_write(address, length) + + if restore_original_address: + self._load_address_counter(original_address, force_load=True) # TODO: add validation of column and data length for write() - must be inside status line class StatusLine: diff --git a/requirements.txt b/requirements.txt index 18b017e..8726c15 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ ptyprocess==0.6.0 -pycoax==0.2.0 +pycoax==0.3.1 pyserial==3.4 pyte==0.8.0 pytn3270==0.5.0 diff --git a/tests/test_controller.py b/tests/test_controller.py index 3473038..4f8c9a5 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -57,6 +57,10 @@ class RunLoopTestCase(unittest.TestCase): patcher.start() + patcher = patch('oec.display.write_data') + + patcher.start() + self.addCleanup(patch.stopall) def test_no_terminal(self): diff --git a/tests/test_display.py b/tests/test_display.py index 4399d56..f5e82d3 100644 --- a/tests/test_display.py +++ b/tests/test_display.py @@ -157,6 +157,10 @@ class DisplayClearTestCase(unittest.TestCase): self.load_address_counter_lo_mock = patcher.start() + patcher = patch('oec.display.write_data') + + self.write_data_mock = patcher.start() + self.addCleanup(patch.stopall) def test_excluding_status_line(self): @@ -211,6 +215,10 @@ class DisplayFlushRangeTestCase(unittest.TestCase): self.load_address_counter_lo_mock = patcher.start() + patcher = patch('oec.display.write_data') + + self.write_data_mock = patcher.start() + self.addCleanup(patch.stopall) def test_when_start_address_is_current_address_counter(self): @@ -225,7 +233,7 @@ class DisplayFlushRangeTestCase(unittest.TestCase): self.display.flush() # Assert - self.display._write.assert_called_with(bytes.fromhex('01 02 03'), address=None) + self.display._write.assert_called_with(bytes.fromhex('01 02 03'), address=80) self.assertEqual(self.display.address_counter, 83) self.assertFalse(self.display.dirty) @@ -363,19 +371,99 @@ class DisplayWriteTestCase(unittest.TestCase): self.display = Display(self.interface, dimensions) + self.display._load_address_counter = Mock(wraps=self.display._load_address_counter) + + patcher = patch('oec.display.load_address_counter_hi') + + self.load_address_counter_hi_mock = patcher.start() + + patcher = patch('oec.display.load_address_counter_lo') + + self.load_address_counter_lo_mock = patcher.start() + + patcher = patch('oec.display.write_data') + + self.write_data_mock = patcher.start() + + self.addCleanup(patch.stopall) + def test(self): # Act self.display._write(bytes.fromhex('01 02 03')) # Assert - self.interface.offload_write.assert_called_with(bytes.fromhex('01 02 03'), address=None, restore_original_address=False, repeat=0) + self.assertIsNone(self.display.address_counter) + + self.write_data_mock.assert_called_with(self.interface, bytes.fromhex('01 02 03')) def test_repeat(self): # Act self.display._write((bytes.fromhex('01 02 03'), 3)) # Assert - self.interface.offload_write.assert_called_with(bytes.fromhex('01 02 03'), address=None, restore_original_address=False, repeat=2) + self.assertIsNone(self.display.address_counter) + + self.write_data_mock.assert_called_with(self.interface, (bytes.fromhex('01 02 03'), 3)) + + def test_address_if_current_address_unknown(self): + # Arrange + self.assertIsNone(self.display.address_counter) + + # Act + self.display._write(bytes.fromhex('01 02 03'), address=80) + + # Assert + self.assertEqual(self.display.address_counter, 83) + + def test_address_if_change(self): + # Arrange + self.display.address_counter = 160 + + # Act + self.display._write(bytes.fromhex('01 02 03'), address=80) + + # Assert + self.assertEqual(self.display.address_counter, 83) + + self.display._load_address_counter.assert_called_with(80, force_load=False) + + def test_address_if_no_change(self): + # Arrange + self.display.address_counter = 80 + + # Act + self.display._write(bytes.fromhex('01 02 03'), address=80) + + # Assert + self.assertEqual(self.display.address_counter, 83) + + self.display._load_address_counter.assert_called_with(80, force_load=False) + + def test_restore_original_address_if_current_address_unknown(self): + # Arrange + self.display._read_address_counter = Mock(return_value=160) + + self.assertIsNone(self.display.address_counter) + + # Act + self.display._write(bytes.fromhex('01 02 03'), restore_original_address=True) + + # Assert + self.assertEqual(self.display.address_counter, 160) + + def test_restore_original_address_if_current_address_known(self): + # Arrange + self.display._read_address_counter = Mock(return_value=160) + + self.display.address_counter = 160 + + # Act + self.display._write(bytes.fromhex('01 02 03'), restore_original_address=True) + + # Assert + self.assertEqual(self.display.address_counter, 160) + + self.display._read_address_counter.assert_not_called() class EncodeAsciiCharacterTestCase(unittest.TestCase): def test_mapped_character(self):