Refactor display and improve unit test coverage

This commit is contained in:
Andrew Kay
2019-07-02 22:16:21 -05:00
parent f2794ef5d6
commit ac3c0f7508
4 changed files with 298 additions and 73 deletions

View File

@@ -88,7 +88,7 @@ class Controller:
self.logger.info(f'Rows = {rows}, Columns = {columns}, Keymap = {keymap_name}')
self.terminal.display.clear_screen(include_status_line=True)
self.terminal.display.clear(include_status_line=True)
# Show the attached indicator on the status line.
self.terminal.display.status_line.write_string(0, 'S')

View File

@@ -6,7 +6,6 @@ oec.display
from collections import namedtuple
import logging
from sortedcontainers import SortedSet
from coax import write_data
_ASCII_CHAR_MAP = {
'>': 0x08,
@@ -171,16 +170,46 @@ class Display:
self.status_line = StatusLine(self.interface, columns)
def load_address_counter(self, address=None, index=None, row=None, column=None):
def move_cursor(self, address=None, index=None, row=None, column=None,
force_load=False):
"""Load the address counter."""
if address is None:
address = self.calculate_address(index=index, row=row, column=column)
address = self._calculate_address(index=index, row=row, column=column)
# TODO: Verify that the address is within range - exclude status line.
if address == self.address_counter and not force_load:
return False
self.interface.offload_load_address_counter(address)
self.address_counter = address
def clear_screen(self, include_status_line=False):
return True
def write_buffer(self, byte, index=None, row=None, column=None):
if index is None:
if row is not None and column is not None:
index = self._get_index(row, column)
else:
raise ValueError('Either index or row and column is required')
# TODO: Verify that index is within range.
if self.buffer[index] == byte:
return False
self.buffer[index] = byte
self.dirty.add(index)
return True
def flush(self):
for (start_index, end_index) in self._get_dirty_ranges():
self._flush_range(start_index, end_index)
def clear(self, include_status_line=False):
"""Clear the screen."""
(rows, columns) = self.dimensions
@@ -199,29 +228,12 @@ class Display:
self.dirty.clear()
self.load_address_counter(index=0)
self.move_cursor(index=0, force_load=True)
def write_buffer(self, byte, index=None, row=None, column=None):
if index is None:
if row is not None and column is not None:
index = self._get_index(row, column)
else:
raise ValueError('Either index or row and column is required')
def _get_index(self, row, column):
return (row * self.dimensions.columns) + column
if self.buffer[index] == byte:
return False
self.buffer[index] = byte
self.dirty.add(index)
return True
def flush(self):
for (start_index, end_index) in self._get_dirty_ranges():
self._flush_range(start_index, end_index)
def calculate_address(self, index=None, row=None, column=None):
def _calculate_address(self, index=None, row=None, column=None):
if index is not None:
return self.dimensions.columns + index
@@ -230,8 +242,16 @@ class Display:
raise ValueError('Either index or row and column is required')
def _get_index(self, row, column):
return (row * self.dimensions.columns) + column
def _calculate_address_after_write(self, address, count):
address += count
(rows, columns) = self.dimensions
# TODO: Determine the correct behavior here...
if self.address_counter >= self._calculate_address((rows * columns) - 1):
return None
return address
def _get_dirty_ranges(self):
if not self.dirty:
@@ -246,32 +266,17 @@ class Display:
data = self.buffer[start_index:end_index+1]
address = self.calculate_address(start_index)
address = self._calculate_address(start_index)
# TODO: Consider using offload for all writing - set address to None if it is the
# same as the current address counter to avoid the additional load command.
if address != self.address_counter:
try:
self.interface.offload_write(data, address=address)
except Exception as error:
self.logger.error(f'Offload write error: {error}', exc_info=error)
try:
self.interface.offload_write(data, address=address if address != self.address_counter else None)
except Exception as error:
# TODO: This could leave the address_counter incorrect.
self.logger.error(f'Offload write error: {error}', exc_info=error)
self.address_counter = address + len(data)
else:
try:
write_data(self.interface, data)
except Exception as error:
self.logger.error(f'WRITE_DATA error: {error}', exc_info=error)
self.address_counter = self._calculate_address_after_write(address, len(data))
self.address_counter += len(data)
# Force the address counter to be updated...
(rows, columns) = self.dimensions
if self.address_counter >= self.calculate_address((rows * columns) - 1):
self.address_counter = None
for index in range(start_index, end_index+1):
for index in range(start_index, end_index + 1):
self.dirty.discard(index)
return self.address_counter

View File

@@ -93,13 +93,13 @@ class VT100Session(Session):
self._start_host_process()
# Clear the screen.
self.terminal.display.clear_screen()
self.terminal.display.clear()
# Update the status line.
self.terminal.display.status_line.write_string(45, 'VT100')
# Load the address counter.
self.terminal.display.load_address_counter(index=0)
# Reset the cursor.
self.terminal.display.move_cursor(row=0, column=0)
def terminate(self):
if self.host_process:
@@ -198,24 +198,10 @@ class VT100Session(Session):
self.terminal.display.write_buffer(byte, row=row, column=column)
def _flush(self):
display = self.terminal.display
self.terminal.display.flush()
display.flush()
# Syncronize the cursor.
# TODO: Investigate different approaches to making cursor syncronization more
# reliable - maybe it needs to be forced sometimes.
cursor = self.vt100_screen.cursor
address = display.calculate_address(row=cursor.y, column=cursor.x)
# TODO: Investigate different approaches to reducing the need to syncronize the cursor
# or make it more reliable.
if address != display.address_counter:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug((f'Setting address counter: Address = {address}, '
f'Address Counter = {display.address_counter}'))
display.load_address_counter(address)
else:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug((f'Skipping address counter: Address Counter = '
f'{display.address_counter}'))
self.terminal.display.move_cursor(row=cursor.y, column=cursor.x)

View File

@@ -1,8 +1,242 @@
import unittest
from unittest.mock import Mock, patch
import context
from oec.display import encode_ascii_character, encode_string
from oec.display import Dimensions, Display, encode_ascii_character, encode_string
class DisplayMoveCursorTestCase(unittest.TestCase):
def setUp(self):
self.interface = Mock()
dimensions = Dimensions(24, 80)
self.display = Display(self.interface, dimensions)
def test_with_address(self):
# Act
self.display.move_cursor(address=80)
# Assert
self.assertEqual(self.display.address_counter, 80)
self.interface.offload_load_address_counter.assert_called_with(80)
def test_with_index(self):
# Act
self.display.move_cursor(index=15)
# Assert
self.assertEqual(self.display.address_counter, 95)
self.interface.offload_load_address_counter.assert_called_with(95)
def test_with_coordinates(self):
# Act
self.display.move_cursor(row=10, column=15)
# Assert
self.assertEqual(self.display.address_counter, 895)
self.interface.offload_load_address_counter.assert_called_with(895)
def test_no_change(self):
# Arrange
self.display.move_cursor(address=80)
self.interface.offload_load_address_counter.reset_mock()
# Act
self.display.move_cursor(address=80)
# Assert
self.assertEqual(self.display.address_counter, 80)
self.interface.offload_load_address_counter.assert_not_called()
def test_no_change_force(self):
# Arrange
self.display.move_cursor(address=80)
self.interface.offload_load_address_counter.reset_mock()
# Act
self.display.move_cursor(address=80, force_load=True)
# Assert
self.assertEqual(self.display.address_counter, 80)
self.interface.offload_load_address_counter.assert_called_with(80)
class DisplayWriteBufferTestCase(unittest.TestCase):
def setUp(self):
self.interface = Mock()
dimensions = Dimensions(24, 80)
self.display = Display(self.interface, dimensions)
def test_with_index(self):
# Act
self.display.write_buffer(0x01, index=15)
self.display.write_buffer(0x02, index=97)
# Assert
self.assertEqual(self.display.buffer[15], 0x01)
self.assertEqual(self.display.buffer[97], 0x02)
self.assertSequenceEqual(self.display.dirty, [15, 97])
def test_with_coordinates(self):
# Act
self.display.write_buffer(0x01, row=0, column=15)
self.display.write_buffer(0x02, row=1, column=17)
# Assert
self.assertEqual(self.display.buffer[15], 0x01)
self.assertEqual(self.display.buffer[97], 0x02)
self.assertSequenceEqual(self.display.dirty, [15, 97])
def test_change(self):
self.assertTrue(self.display.write_buffer(0x01, index=0))
self.assertTrue(self.display.write_buffer(0x02, index=0))
self.assertEqual(self.display.buffer[0], 0x02)
self.assertSequenceEqual(self.display.dirty, [0])
def test_no_change(self):
self.assertTrue(self.display.write_buffer(0x01, index=0))
self.assertFalse(self.display.write_buffer(0x01, index=0))
self.assertEqual(self.display.buffer[0], 0x01)
self.assertSequenceEqual(self.display.dirty, [0])
class DisplayFlushTestCase(unittest.TestCase):
def setUp(self):
self.interface = Mock()
dimensions = Dimensions(24, 80)
self.display = Display(self.interface, dimensions)
self.display._flush_range = Mock()
def test_no_changes(self):
# Act
self.display.flush()
# Assert
self.display._flush_range.assert_not_called()
def test_single_range(self):
# Arrange
self.display.write_buffer(0x01, index=0)
self.display.write_buffer(0x02, index=1)
self.display.write_buffer(0x03, index=2)
# Act
self.display.flush()
# Assert
self.display._flush_range.assert_called_with(0, 2)
def test_multiple_ranges(self):
# Arrange
self.display.write_buffer(0x01, index=0)
self.display.write_buffer(0x02, index=1)
self.display.write_buffer(0x03, index=2)
self.display.write_buffer(0x05, index=30)
self.display.write_buffer(0x06, index=31)
self.display.write_buffer(0x04, index=20)
# Act
self.display.flush()
# Assert
self.display._flush_range.assert_called_with(0, 31)
class DisplayClearTestCase(unittest.TestCase):
def setUp(self):
self.interface = Mock()
dimensions = Dimensions(24, 80)
self.display = Display(self.interface, dimensions)
def test_excluding_status_line(self):
# Arrange
self.display.write_buffer(0x01, index=0)
self.assertEqual(self.display.buffer[0], 0x01)
self.assertTrue(self.display.dirty)
# Act
self.display.clear(include_status_line=False)
# Assert
self.interface.offload_write.assert_called_with(b'\x00', address=80, repeat=1919)
self.interface.offload_load_address_counter.assert_called_with(80)
self.assertEqual(self.display.buffer[0], 0x00)
self.assertFalse(self.display.dirty)
def test_including_status_line(self):
# Arrange
self.display.write_buffer(0x01, index=0)
self.assertEqual(self.display.buffer[0], 0x01)
self.assertTrue(self.display.dirty)
# Act
self.display.clear(include_status_line=True)
# Assert
self.interface.offload_write.assert_called_with(b'\x00', address=0, repeat=1999)
self.interface.offload_load_address_counter.assert_called_with(80)
self.assertEqual(self.display.buffer[0], 0x00)
self.assertFalse(self.display.dirty)
class DisplayFlushRangeTestCase(unittest.TestCase):
def setUp(self):
self.interface = Mock()
dimensions = Dimensions(24, 80)
self.display = Display(self.interface, dimensions)
def test_when_start_address_is_current_address_counter(self):
# Arrange
self.display.move_cursor(index=0)
self.display.write_buffer(0x01, index=0)
self.display.write_buffer(0x02, index=1)
self.display.write_buffer(0x03, index=2)
# Act
self.display.flush()
# Assert
self.interface.offload_write.assert_called_with(bytes.fromhex('01 02 03'), address=None)
self.assertEqual(self.display.address_counter, 83)
self.assertFalse(self.display.dirty)
def test_when_start_address_is_not_current_address_counter(self):
# Arrange
self.display.move_cursor(index=70)
self.display.write_buffer(0x01, index=0)
self.display.write_buffer(0x02, index=1)
self.display.write_buffer(0x03, index=2)
# Act
self.display.flush()
# Assert
self.interface.offload_write.assert_called_with(bytes.fromhex('01 02 03'), address=80)
self.assertEqual(self.display.address_counter, 83)
self.assertFalse(self.display.dirty)
class EncodeAsciiCharacterTestCase(unittest.TestCase):
def test_mapped_character(self):