diff --git a/oec/controller.py b/oec/controller.py index 84736f7..28cb9ee 100644 --- a/oec/controller.py +++ b/oec/controller.py @@ -88,7 +88,7 @@ class Controller: self.logger.info(f'Rows = {rows}, Columns = {columns}, Keymap = {keymap_name}') - self.terminal.display.clear_screen(include_status_line=True) + self.terminal.display.clear(include_status_line=True) # Show the attached indicator on the status line. self.terminal.display.status_line.write_string(0, 'S') diff --git a/oec/display.py b/oec/display.py index ce9716c..b70223c 100644 --- a/oec/display.py +++ b/oec/display.py @@ -6,7 +6,6 @@ oec.display from collections import namedtuple import logging from sortedcontainers import SortedSet -from coax import write_data _ASCII_CHAR_MAP = { '>': 0x08, @@ -171,16 +170,46 @@ class Display: self.status_line = StatusLine(self.interface, columns) - def load_address_counter(self, address=None, index=None, row=None, column=None): + def move_cursor(self, address=None, index=None, row=None, column=None, + force_load=False): """Load the address counter.""" if address is None: - address = self.calculate_address(index=index, row=row, column=column) + address = self._calculate_address(index=index, row=row, column=column) + + # TODO: Verify that the address is within range - exclude status line. + + if address == self.address_counter and not force_load: + return False self.interface.offload_load_address_counter(address) self.address_counter = address - def clear_screen(self, include_status_line=False): + return True + + def write_buffer(self, byte, index=None, row=None, column=None): + if index is None: + if row is not None and column is not None: + index = self._get_index(row, column) + else: + raise ValueError('Either index or row and column is required') + + # TODO: Verify that index is within range. + + if self.buffer[index] == byte: + return False + + self.buffer[index] = byte + + self.dirty.add(index) + + return True + + def flush(self): + for (start_index, end_index) in self._get_dirty_ranges(): + self._flush_range(start_index, end_index) + + def clear(self, include_status_line=False): """Clear the screen.""" (rows, columns) = self.dimensions @@ -199,29 +228,12 @@ class Display: self.dirty.clear() - self.load_address_counter(index=0) + self.move_cursor(index=0, force_load=True) - def write_buffer(self, byte, index=None, row=None, column=None): - if index is None: - if row is not None and column is not None: - index = self._get_index(row, column) - else: - raise ValueError('Either index or row and column is required') + def _get_index(self, row, column): + return (row * self.dimensions.columns) + column - if self.buffer[index] == byte: - return False - - self.buffer[index] = byte - - self.dirty.add(index) - - return True - - def flush(self): - for (start_index, end_index) in self._get_dirty_ranges(): - self._flush_range(start_index, end_index) - - def calculate_address(self, index=None, row=None, column=None): + def _calculate_address(self, index=None, row=None, column=None): if index is not None: return self.dimensions.columns + index @@ -230,8 +242,16 @@ class Display: raise ValueError('Either index or row and column is required') - def _get_index(self, row, column): - return (row * self.dimensions.columns) + column + def _calculate_address_after_write(self, address, count): + address += count + + (rows, columns) = self.dimensions + + # TODO: Determine the correct behavior here... + if self.address_counter >= self._calculate_address((rows * columns) - 1): + return None + + return address def _get_dirty_ranges(self): if not self.dirty: @@ -246,32 +266,17 @@ class Display: data = self.buffer[start_index:end_index+1] - address = self.calculate_address(start_index) + address = self._calculate_address(start_index) - # TODO: Consider using offload for all writing - set address to None if it is the - # same as the current address counter to avoid the additional load command. - if address != self.address_counter: - try: - self.interface.offload_write(data, address=address) - except Exception as error: - self.logger.error(f'Offload write error: {error}', exc_info=error) + try: + self.interface.offload_write(data, address=address if address != self.address_counter else None) + except Exception as error: + # TODO: This could leave the address_counter incorrect. + self.logger.error(f'Offload write error: {error}', exc_info=error) - self.address_counter = address + len(data) - else: - try: - write_data(self.interface, data) - except Exception as error: - self.logger.error(f'WRITE_DATA error: {error}', exc_info=error) + self.address_counter = self._calculate_address_after_write(address, len(data)) - self.address_counter += len(data) - - # Force the address counter to be updated... - (rows, columns) = self.dimensions - - if self.address_counter >= self.calculate_address((rows * columns) - 1): - self.address_counter = None - - for index in range(start_index, end_index+1): + for index in range(start_index, end_index + 1): self.dirty.discard(index) return self.address_counter diff --git a/oec/vt100.py b/oec/vt100.py index 69b82bf..87e64ca 100644 --- a/oec/vt100.py +++ b/oec/vt100.py @@ -93,13 +93,13 @@ class VT100Session(Session): self._start_host_process() # Clear the screen. - self.terminal.display.clear_screen() + self.terminal.display.clear() # Update the status line. self.terminal.display.status_line.write_string(45, 'VT100') - # Load the address counter. - self.terminal.display.load_address_counter(index=0) + # Reset the cursor. + self.terminal.display.move_cursor(row=0, column=0) def terminate(self): if self.host_process: @@ -198,24 +198,10 @@ class VT100Session(Session): self.terminal.display.write_buffer(byte, row=row, column=column) def _flush(self): - display = self.terminal.display + self.terminal.display.flush() - display.flush() - - # Syncronize the cursor. + # TODO: Investigate different approaches to making cursor syncronization more + # reliable - maybe it needs to be forced sometimes. cursor = self.vt100_screen.cursor - address = display.calculate_address(row=cursor.y, column=cursor.x) - - # TODO: Investigate different approaches to reducing the need to syncronize the cursor - # or make it more reliable. - if address != display.address_counter: - if self.logger.isEnabledFor(logging.DEBUG): - self.logger.debug((f'Setting address counter: Address = {address}, ' - f'Address Counter = {display.address_counter}')) - - display.load_address_counter(address) - else: - if self.logger.isEnabledFor(logging.DEBUG): - self.logger.debug((f'Skipping address counter: Address Counter = ' - f'{display.address_counter}')) + self.terminal.display.move_cursor(row=cursor.y, column=cursor.x) diff --git a/tests/test_display.py b/tests/test_display.py index 38b6564..9fdcd5d 100644 --- a/tests/test_display.py +++ b/tests/test_display.py @@ -1,8 +1,242 @@ import unittest +from unittest.mock import Mock, patch import context -from oec.display import encode_ascii_character, encode_string +from oec.display import Dimensions, Display, encode_ascii_character, encode_string + +class DisplayMoveCursorTestCase(unittest.TestCase): + def setUp(self): + self.interface = Mock() + + dimensions = Dimensions(24, 80) + + self.display = Display(self.interface, dimensions) + + def test_with_address(self): + # Act + self.display.move_cursor(address=80) + + # Assert + self.assertEqual(self.display.address_counter, 80) + + self.interface.offload_load_address_counter.assert_called_with(80) + + def test_with_index(self): + # Act + self.display.move_cursor(index=15) + + # Assert + self.assertEqual(self.display.address_counter, 95) + + self.interface.offload_load_address_counter.assert_called_with(95) + + def test_with_coordinates(self): + # Act + self.display.move_cursor(row=10, column=15) + + # Assert + self.assertEqual(self.display.address_counter, 895) + + self.interface.offload_load_address_counter.assert_called_with(895) + + def test_no_change(self): + # Arrange + self.display.move_cursor(address=80) + + self.interface.offload_load_address_counter.reset_mock() + + # Act + self.display.move_cursor(address=80) + + # Assert + self.assertEqual(self.display.address_counter, 80) + + self.interface.offload_load_address_counter.assert_not_called() + + def test_no_change_force(self): + # Arrange + self.display.move_cursor(address=80) + + self.interface.offload_load_address_counter.reset_mock() + + # Act + self.display.move_cursor(address=80, force_load=True) + + # Assert + self.assertEqual(self.display.address_counter, 80) + + self.interface.offload_load_address_counter.assert_called_with(80) + +class DisplayWriteBufferTestCase(unittest.TestCase): + def setUp(self): + self.interface = Mock() + + dimensions = Dimensions(24, 80) + + self.display = Display(self.interface, dimensions) + + def test_with_index(self): + # Act + self.display.write_buffer(0x01, index=15) + self.display.write_buffer(0x02, index=97) + + # Assert + self.assertEqual(self.display.buffer[15], 0x01) + self.assertEqual(self.display.buffer[97], 0x02) + self.assertSequenceEqual(self.display.dirty, [15, 97]) + + def test_with_coordinates(self): + # Act + self.display.write_buffer(0x01, row=0, column=15) + self.display.write_buffer(0x02, row=1, column=17) + + # Assert + self.assertEqual(self.display.buffer[15], 0x01) + self.assertEqual(self.display.buffer[97], 0x02) + self.assertSequenceEqual(self.display.dirty, [15, 97]) + + def test_change(self): + self.assertTrue(self.display.write_buffer(0x01, index=0)) + self.assertTrue(self.display.write_buffer(0x02, index=0)) + + self.assertEqual(self.display.buffer[0], 0x02) + self.assertSequenceEqual(self.display.dirty, [0]) + + def test_no_change(self): + self.assertTrue(self.display.write_buffer(0x01, index=0)) + self.assertFalse(self.display.write_buffer(0x01, index=0)) + + self.assertEqual(self.display.buffer[0], 0x01) + self.assertSequenceEqual(self.display.dirty, [0]) + +class DisplayFlushTestCase(unittest.TestCase): + def setUp(self): + self.interface = Mock() + + dimensions = Dimensions(24, 80) + + self.display = Display(self.interface, dimensions) + + self.display._flush_range = Mock() + + def test_no_changes(self): + # Act + self.display.flush() + + # Assert + self.display._flush_range.assert_not_called() + + def test_single_range(self): + # Arrange + self.display.write_buffer(0x01, index=0) + self.display.write_buffer(0x02, index=1) + self.display.write_buffer(0x03, index=2) + + # Act + self.display.flush() + + # Assert + self.display._flush_range.assert_called_with(0, 2) + + def test_multiple_ranges(self): + # Arrange + self.display.write_buffer(0x01, index=0) + self.display.write_buffer(0x02, index=1) + self.display.write_buffer(0x03, index=2) + self.display.write_buffer(0x05, index=30) + self.display.write_buffer(0x06, index=31) + self.display.write_buffer(0x04, index=20) + + # Act + self.display.flush() + + # Assert + self.display._flush_range.assert_called_with(0, 31) + +class DisplayClearTestCase(unittest.TestCase): + def setUp(self): + self.interface = Mock() + + dimensions = Dimensions(24, 80) + + self.display = Display(self.interface, dimensions) + + def test_excluding_status_line(self): + # Arrange + self.display.write_buffer(0x01, index=0) + + self.assertEqual(self.display.buffer[0], 0x01) + self.assertTrue(self.display.dirty) + + # Act + self.display.clear(include_status_line=False) + + # Assert + self.interface.offload_write.assert_called_with(b'\x00', address=80, repeat=1919) + self.interface.offload_load_address_counter.assert_called_with(80) + + self.assertEqual(self.display.buffer[0], 0x00) + self.assertFalse(self.display.dirty) + + def test_including_status_line(self): + # Arrange + self.display.write_buffer(0x01, index=0) + + self.assertEqual(self.display.buffer[0], 0x01) + self.assertTrue(self.display.dirty) + + # Act + self.display.clear(include_status_line=True) + + # Assert + self.interface.offload_write.assert_called_with(b'\x00', address=0, repeat=1999) + self.interface.offload_load_address_counter.assert_called_with(80) + + self.assertEqual(self.display.buffer[0], 0x00) + self.assertFalse(self.display.dirty) + +class DisplayFlushRangeTestCase(unittest.TestCase): + def setUp(self): + self.interface = Mock() + + dimensions = Dimensions(24, 80) + + self.display = Display(self.interface, dimensions) + + def test_when_start_address_is_current_address_counter(self): + # Arrange + self.display.move_cursor(index=0) + + self.display.write_buffer(0x01, index=0) + self.display.write_buffer(0x02, index=1) + self.display.write_buffer(0x03, index=2) + + # Act + self.display.flush() + + # Assert + self.interface.offload_write.assert_called_with(bytes.fromhex('01 02 03'), address=None) + + self.assertEqual(self.display.address_counter, 83) + self.assertFalse(self.display.dirty) + + def test_when_start_address_is_not_current_address_counter(self): + # Arrange + self.display.move_cursor(index=70) + + self.display.write_buffer(0x01, index=0) + self.display.write_buffer(0x02, index=1) + self.display.write_buffer(0x03, index=2) + + # Act + self.display.flush() + + # Assert + self.interface.offload_write.assert_called_with(bytes.fromhex('01 02 03'), address=80) + + self.assertEqual(self.display.address_counter, 83) + self.assertFalse(self.display.dirty) class EncodeAsciiCharacterTestCase(unittest.TestCase): def test_mapped_character(self):