mirror of
https://github.com/lowobservable/oec.git
synced 2026-04-25 19:51:42 +00:00
Do not use offload_load_address_counter
This commit is contained in:
@@ -6,6 +6,7 @@ oec.display
|
|||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
import logging
|
import logging
|
||||||
from sortedcontainers import SortedSet
|
from sortedcontainers import SortedSet
|
||||||
|
from coax import load_address_counter_hi, load_address_counter_lo
|
||||||
|
|
||||||
_ASCII_CHAR_MAP = {
|
_ASCII_CHAR_MAP = {
|
||||||
'>': 0x08,
|
'>': 0x08,
|
||||||
@@ -179,7 +180,7 @@ class Display:
|
|||||||
|
|
||||||
self.address_counter = None
|
self.address_counter = None
|
||||||
|
|
||||||
self.status_line = StatusLine(self.interface, columns)
|
self.status_line = StatusLine(self)
|
||||||
|
|
||||||
self.cursor_reverse = False
|
self.cursor_reverse = False
|
||||||
self.cursor_blink = False
|
self.cursor_blink = False
|
||||||
@@ -190,14 +191,7 @@ class Display:
|
|||||||
|
|
||||||
# TODO: Verify that the address is within range - exclude status line.
|
# TODO: Verify that the address is within range - exclude status line.
|
||||||
|
|
||||||
if address == self.address_counter and not force_load:
|
return self._load_address_counter(address, force_load)
|
||||||
return False
|
|
||||||
|
|
||||||
self.interface.offload_load_address_counter(address)
|
|
||||||
|
|
||||||
self.address_counter = address
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
def buffered_write(self, byte, index=None, row=None, column=None):
|
def buffered_write(self, byte, index=None, row=None, column=None):
|
||||||
if index is None:
|
if index is None:
|
||||||
@@ -227,12 +221,12 @@ class Display:
|
|||||||
|
|
||||||
if clear_status_line:
|
if clear_status_line:
|
||||||
address = 0
|
address = 0
|
||||||
repeat = ((rows + 1) * columns) - 1
|
count = (rows + 1) * columns
|
||||||
else:
|
else:
|
||||||
address = columns
|
address = columns
|
||||||
repeat = (rows * columns) - 1
|
count = rows * columns
|
||||||
|
|
||||||
self.interface.offload_write(b'\x00', address=address, repeat=repeat)
|
self._write((b'\x00', count), address=address)
|
||||||
|
|
||||||
# Update the buffer and dirty indicators to reflect the cleared screen.
|
# Update the buffer and dirty indicators to reflect the cleared screen.
|
||||||
for index in range(rows * columns):
|
for index in range(rows * columns):
|
||||||
@@ -271,6 +265,23 @@ class Display:
|
|||||||
|
|
||||||
return address
|
return address
|
||||||
|
|
||||||
|
def _load_address_counter(self, address, force_load):
|
||||||
|
if address == self.address_counter and not force_load:
|
||||||
|
return False
|
||||||
|
|
||||||
|
(hi, lo) = _split_address(address)
|
||||||
|
(current_hi, current_lo) = _split_address(self.address_counter)
|
||||||
|
|
||||||
|
if hi != current_hi or force_load:
|
||||||
|
load_address_counter_hi(self.interface, hi)
|
||||||
|
|
||||||
|
if lo != current_lo or force_load:
|
||||||
|
load_address_counter_lo(self.interface, lo)
|
||||||
|
|
||||||
|
self.address_counter = address
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
def _get_dirty_ranges(self):
|
def _get_dirty_ranges(self):
|
||||||
if not self.dirty:
|
if not self.dirty:
|
||||||
return []
|
return []
|
||||||
@@ -287,10 +298,10 @@ class Display:
|
|||||||
address = self._calculate_address(start_index)
|
address = self._calculate_address(start_index)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.interface.offload_write(data, address=address if address != self.address_counter else None)
|
self._write(data, address=address if address != self.address_counter else None)
|
||||||
except Exception as error:
|
except Exception as error:
|
||||||
# TODO: This could leave the address_counter incorrect.
|
# TODO: This could leave the address_counter incorrect.
|
||||||
self.logger.error(f'Offload write error: {error}', exc_info=error)
|
self.logger.error(f'Write error: {error}', exc_info=error)
|
||||||
|
|
||||||
self.address_counter = self._calculate_address_after_write(address, len(data))
|
self.address_counter = self._calculate_address_after_write(address, len(data))
|
||||||
|
|
||||||
@@ -299,14 +310,27 @@ class Display:
|
|||||||
|
|
||||||
return self.address_counter
|
return self.address_counter
|
||||||
|
|
||||||
|
def _write(self, data, address=None, restore_original_address=False):
|
||||||
|
if isinstance(data, tuple):
|
||||||
|
offload_data = data[0]
|
||||||
|
offload_repeat = max(data[1] - 1, 0)
|
||||||
|
else:
|
||||||
|
offload_data = data
|
||||||
|
offload_repeat = 0
|
||||||
|
|
||||||
|
self.interface.offload_write(offload_data, address=address,
|
||||||
|
restore_original_address=restore_original_address,
|
||||||
|
repeat=offload_repeat)
|
||||||
|
|
||||||
# TODO: add validation of column and data length for write() - must be inside status line
|
# TODO: add validation of column and data length for write() - must be inside status line
|
||||||
class StatusLine:
|
class StatusLine:
|
||||||
def __init__(self, interface, columns):
|
def __init__(self, display):
|
||||||
self.interface = interface
|
self.display = display
|
||||||
self.columns = columns
|
|
||||||
|
self.columns = display.dimensions.columns
|
||||||
|
|
||||||
def write(self, column, data):
|
def write(self, column, data):
|
||||||
self.interface.offload_write(data, address=column, restore_original_address=True)
|
self.display._write(data, address=column, restore_original_address=True)
|
||||||
|
|
||||||
def write_string(self, column, string):
|
def write_string(self, column, string):
|
||||||
self.write(column, encode_string(string))
|
self.write(column, encode_string(string))
|
||||||
@@ -326,3 +350,9 @@ class StatusLine:
|
|||||||
indicators[0] = 0xd3
|
indicators[0] = 0xd3
|
||||||
|
|
||||||
self.write(45, indicators)
|
self.write(45, indicators)
|
||||||
|
|
||||||
|
def _split_address(address):
|
||||||
|
if address is None:
|
||||||
|
return (None, None)
|
||||||
|
|
||||||
|
return ((address >> 8) & 0xff, address & 0xff)
|
||||||
|
|||||||
@@ -49,6 +49,14 @@ class RunLoopTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
self.sleep_mock = patcher.start()
|
self.sleep_mock = patcher.start()
|
||||||
|
|
||||||
|
patcher = patch('oec.display.load_address_counter_hi')
|
||||||
|
|
||||||
|
patcher.start()
|
||||||
|
|
||||||
|
patcher = patch('oec.display.load_address_counter_lo')
|
||||||
|
|
||||||
|
patcher.start()
|
||||||
|
|
||||||
self.addCleanup(patch.stopall)
|
self.addCleanup(patch.stopall)
|
||||||
|
|
||||||
def test_no_terminal(self):
|
def test_no_terminal(self):
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
import unittest
|
import unittest
|
||||||
from unittest.mock import Mock
|
from unittest.mock import Mock, patch
|
||||||
|
|
||||||
import context
|
import context
|
||||||
|
|
||||||
@@ -13,6 +13,18 @@ class DisplayMoveCursorTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
self.display = Display(self.interface, dimensions)
|
self.display = Display(self.interface, dimensions)
|
||||||
|
|
||||||
|
self.display._load_address_counter = Mock(wraps=self.display._load_address_counter)
|
||||||
|
|
||||||
|
patcher = patch('oec.display.load_address_counter_hi')
|
||||||
|
|
||||||
|
self.load_address_counter_hi_mock = patcher.start()
|
||||||
|
|
||||||
|
patcher = patch('oec.display.load_address_counter_lo')
|
||||||
|
|
||||||
|
self.load_address_counter_lo_mock = patcher.start()
|
||||||
|
|
||||||
|
self.addCleanup(patch.stopall)
|
||||||
|
|
||||||
def test(self):
|
def test(self):
|
||||||
# Act
|
# Act
|
||||||
self.display.move_cursor(index=815)
|
self.display.move_cursor(index=815)
|
||||||
@@ -20,7 +32,7 @@ class DisplayMoveCursorTestCase(unittest.TestCase):
|
|||||||
# Assert
|
# Assert
|
||||||
self.assertEqual(self.display.address_counter, 895)
|
self.assertEqual(self.display.address_counter, 895)
|
||||||
|
|
||||||
self.interface.offload_load_address_counter.assert_called_with(895)
|
self.display._load_address_counter.assert_called_with(895, False)
|
||||||
|
|
||||||
def test_with_row_and_column(self):
|
def test_with_row_and_column(self):
|
||||||
# Act
|
# Act
|
||||||
@@ -29,35 +41,16 @@ class DisplayMoveCursorTestCase(unittest.TestCase):
|
|||||||
# Assert
|
# Assert
|
||||||
self.assertEqual(self.display.address_counter, 895)
|
self.assertEqual(self.display.address_counter, 895)
|
||||||
|
|
||||||
self.interface.offload_load_address_counter.assert_called_with(895)
|
self.display._load_address_counter.assert_called_with(895, False)
|
||||||
|
|
||||||
def test_no_change(self):
|
|
||||||
# Arrange
|
|
||||||
self.display.move_cursor(index=0)
|
|
||||||
|
|
||||||
self.interface.offload_load_address_counter.reset_mock()
|
|
||||||
|
|
||||||
|
def test_force(self):
|
||||||
# Act
|
# Act
|
||||||
self.display.move_cursor(index=0)
|
self.display.move_cursor(index=815, force_load=True)
|
||||||
|
|
||||||
# Assert
|
# Assert
|
||||||
self.assertEqual(self.display.address_counter, 80)
|
self.assertEqual(self.display.address_counter, 895)
|
||||||
|
|
||||||
self.interface.offload_load_address_counter.assert_not_called()
|
self.display._load_address_counter.assert_called_with(895, True)
|
||||||
|
|
||||||
def test_no_change_force(self):
|
|
||||||
# Arrange
|
|
||||||
self.display.move_cursor(index=0)
|
|
||||||
|
|
||||||
self.interface.offload_load_address_counter.reset_mock()
|
|
||||||
|
|
||||||
# Act
|
|
||||||
self.display.move_cursor(index=0, force_load=True)
|
|
||||||
|
|
||||||
# Assert
|
|
||||||
self.assertEqual(self.display.address_counter, 80)
|
|
||||||
|
|
||||||
self.interface.offload_load_address_counter.assert_called_with(80)
|
|
||||||
|
|
||||||
class DisplayBufferedWriteTestCase(unittest.TestCase):
|
class DisplayBufferedWriteTestCase(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@@ -153,6 +146,19 @@ class DisplayClearTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
self.display = Display(self.interface, dimensions)
|
self.display = Display(self.interface, dimensions)
|
||||||
|
|
||||||
|
self.display._load_address_counter = Mock(wraps=self.display._load_address_counter)
|
||||||
|
self.display._write = Mock(wraps=self.display._write)
|
||||||
|
|
||||||
|
patcher = patch('oec.display.load_address_counter_hi')
|
||||||
|
|
||||||
|
self.load_address_counter_hi_mock = patcher.start()
|
||||||
|
|
||||||
|
patcher = patch('oec.display.load_address_counter_lo')
|
||||||
|
|
||||||
|
self.load_address_counter_lo_mock = patcher.start()
|
||||||
|
|
||||||
|
self.addCleanup(patch.stopall)
|
||||||
|
|
||||||
def test_excluding_status_line(self):
|
def test_excluding_status_line(self):
|
||||||
# Arrange
|
# Arrange
|
||||||
self.display.buffered_write(0x01, index=0)
|
self.display.buffered_write(0x01, index=0)
|
||||||
@@ -164,8 +170,8 @@ class DisplayClearTestCase(unittest.TestCase):
|
|||||||
self.display.clear(clear_status_line=False)
|
self.display.clear(clear_status_line=False)
|
||||||
|
|
||||||
# Assert
|
# Assert
|
||||||
self.interface.offload_write.assert_called_with(b'\x00', address=80, repeat=1919)
|
self.display._write.assert_called_with((b'\x00', 1920), address=80)
|
||||||
self.interface.offload_load_address_counter.assert_called_with(80)
|
self.display._load_address_counter.assert_called_with(80, True)
|
||||||
|
|
||||||
self.assertEqual(self.display.buffer[0], 0x00)
|
self.assertEqual(self.display.buffer[0], 0x00)
|
||||||
self.assertFalse(self.display.dirty)
|
self.assertFalse(self.display.dirty)
|
||||||
@@ -181,8 +187,8 @@ class DisplayClearTestCase(unittest.TestCase):
|
|||||||
self.display.clear(clear_status_line=True)
|
self.display.clear(clear_status_line=True)
|
||||||
|
|
||||||
# Assert
|
# Assert
|
||||||
self.interface.offload_write.assert_called_with(b'\x00', address=0, repeat=1999)
|
self.display._write.assert_called_with((b'\x00', 2000), address=0)
|
||||||
self.interface.offload_load_address_counter.assert_called_with(80)
|
self.display._load_address_counter.assert_called_with(80, True)
|
||||||
|
|
||||||
self.assertEqual(self.display.buffer[0], 0x00)
|
self.assertEqual(self.display.buffer[0], 0x00)
|
||||||
self.assertFalse(self.display.dirty)
|
self.assertFalse(self.display.dirty)
|
||||||
@@ -195,6 +201,18 @@ class DisplayFlushRangeTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
self.display = Display(self.interface, dimensions)
|
self.display = Display(self.interface, dimensions)
|
||||||
|
|
||||||
|
self.display._write = Mock(wraps=self.display._write)
|
||||||
|
|
||||||
|
patcher = patch('oec.display.load_address_counter_hi')
|
||||||
|
|
||||||
|
self.load_address_counter_hi_mock = patcher.start()
|
||||||
|
|
||||||
|
patcher = patch('oec.display.load_address_counter_lo')
|
||||||
|
|
||||||
|
self.load_address_counter_lo_mock = patcher.start()
|
||||||
|
|
||||||
|
self.addCleanup(patch.stopall)
|
||||||
|
|
||||||
def test_when_start_address_is_current_address_counter(self):
|
def test_when_start_address_is_current_address_counter(self):
|
||||||
# Arrange
|
# Arrange
|
||||||
self.display.move_cursor(index=0)
|
self.display.move_cursor(index=0)
|
||||||
@@ -207,7 +225,7 @@ class DisplayFlushRangeTestCase(unittest.TestCase):
|
|||||||
self.display.flush()
|
self.display.flush()
|
||||||
|
|
||||||
# Assert
|
# Assert
|
||||||
self.interface.offload_write.assert_called_with(bytes.fromhex('01 02 03'), address=None)
|
self.display._write.assert_called_with(bytes.fromhex('01 02 03'), address=None)
|
||||||
|
|
||||||
self.assertEqual(self.display.address_counter, 83)
|
self.assertEqual(self.display.address_counter, 83)
|
||||||
self.assertFalse(self.display.dirty)
|
self.assertFalse(self.display.dirty)
|
||||||
@@ -224,11 +242,141 @@ class DisplayFlushRangeTestCase(unittest.TestCase):
|
|||||||
self.display.flush()
|
self.display.flush()
|
||||||
|
|
||||||
# Assert
|
# Assert
|
||||||
self.interface.offload_write.assert_called_with(bytes.fromhex('01 02 03'), address=80)
|
self.display._write.assert_called_with(bytes.fromhex('01 02 03'), address=80)
|
||||||
|
|
||||||
self.assertEqual(self.display.address_counter, 83)
|
self.assertEqual(self.display.address_counter, 83)
|
||||||
self.assertFalse(self.display.dirty)
|
self.assertFalse(self.display.dirty)
|
||||||
|
|
||||||
|
class DisplayLoadAddressCounterTestCase(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.interface = Mock()
|
||||||
|
|
||||||
|
dimensions = Dimensions(24, 80)
|
||||||
|
|
||||||
|
self.display = Display(self.interface, dimensions)
|
||||||
|
|
||||||
|
patcher = patch('oec.display.load_address_counter_hi')
|
||||||
|
|
||||||
|
self.load_address_counter_hi_mock = patcher.start()
|
||||||
|
|
||||||
|
patcher = patch('oec.display.load_address_counter_lo')
|
||||||
|
|
||||||
|
self.load_address_counter_lo_mock = patcher.start()
|
||||||
|
|
||||||
|
self.addCleanup(patch.stopall)
|
||||||
|
|
||||||
|
def test(self):
|
||||||
|
# Act
|
||||||
|
self.display._load_address_counter(895, force_load=False)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
self.assertEqual(self.display.address_counter, 895)
|
||||||
|
|
||||||
|
self.load_address_counter_hi_mock.assert_called_with(self.interface, 3)
|
||||||
|
self.load_address_counter_lo_mock.assert_called_with(self.interface, 127)
|
||||||
|
|
||||||
|
def test_hi_change(self):
|
||||||
|
# Arrange
|
||||||
|
self.display._load_address_counter(895, force_load=False)
|
||||||
|
|
||||||
|
self.load_address_counter_hi_mock.reset_mock()
|
||||||
|
self.load_address_counter_lo_mock.reset_mock()
|
||||||
|
|
||||||
|
# Act
|
||||||
|
self.display._load_address_counter(1151, force_load=False)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
self.assertEqual(self.display.address_counter, 1151)
|
||||||
|
|
||||||
|
self.load_address_counter_hi_mock.assert_called_with(self.interface, 4)
|
||||||
|
self.load_address_counter_lo_mock.assert_not_called()
|
||||||
|
|
||||||
|
def test_lo_change(self):
|
||||||
|
# Arrange
|
||||||
|
self.display._load_address_counter(895, force_load=False)
|
||||||
|
|
||||||
|
self.load_address_counter_hi_mock.reset_mock()
|
||||||
|
self.load_address_counter_lo_mock.reset_mock()
|
||||||
|
|
||||||
|
# Act
|
||||||
|
self.display._load_address_counter(896, force_load=False)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
self.assertEqual(self.display.address_counter, 896)
|
||||||
|
|
||||||
|
self.load_address_counter_hi_mock.assert_not_called()
|
||||||
|
self.load_address_counter_lo_mock.assert_called_with(self.interface, 128)
|
||||||
|
|
||||||
|
def test_hi_lo_change(self):
|
||||||
|
# Arrange
|
||||||
|
self.display._load_address_counter(895, force_load=False)
|
||||||
|
|
||||||
|
self.load_address_counter_hi_mock.reset_mock()
|
||||||
|
self.load_address_counter_lo_mock.reset_mock()
|
||||||
|
|
||||||
|
# Act
|
||||||
|
self.display._load_address_counter(1152, force_load=False)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
self.assertEqual(self.display.address_counter, 1152)
|
||||||
|
|
||||||
|
self.load_address_counter_hi_mock.assert_called_with(self.interface, 4)
|
||||||
|
self.load_address_counter_lo_mock.assert_called_with(self.interface, 128)
|
||||||
|
|
||||||
|
def test_no_change(self):
|
||||||
|
# Arrange
|
||||||
|
self.display._load_address_counter(80, force_load=False)
|
||||||
|
|
||||||
|
self.load_address_counter_hi_mock.reset_mock()
|
||||||
|
self.load_address_counter_lo_mock.reset_mock()
|
||||||
|
|
||||||
|
# Act
|
||||||
|
self.display._load_address_counter(80, force_load=False)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
self.assertEqual(self.display.address_counter, 80)
|
||||||
|
|
||||||
|
self.load_address_counter_hi_mock.assert_not_called()
|
||||||
|
self.load_address_counter_lo_mock.assert_not_called()
|
||||||
|
|
||||||
|
def test_no_change_force(self):
|
||||||
|
# Arrange
|
||||||
|
self.display._load_address_counter(80, force_load=False)
|
||||||
|
|
||||||
|
self.load_address_counter_hi_mock.reset_mock()
|
||||||
|
self.load_address_counter_lo_mock.reset_mock()
|
||||||
|
|
||||||
|
# Act
|
||||||
|
self.display._load_address_counter(80, force_load=True)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
self.assertEqual(self.display.address_counter, 80)
|
||||||
|
|
||||||
|
self.load_address_counter_hi_mock.assert_called_with(self.interface, 0)
|
||||||
|
self.load_address_counter_lo_mock.assert_called_with(self.interface, 80)
|
||||||
|
|
||||||
|
class DisplayWriteTestCase(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.interface = Mock()
|
||||||
|
|
||||||
|
dimensions = Dimensions(24, 80)
|
||||||
|
|
||||||
|
self.display = Display(self.interface, dimensions)
|
||||||
|
|
||||||
|
def test(self):
|
||||||
|
# Act
|
||||||
|
self.display._write(bytes.fromhex('01 02 03'))
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
self.interface.offload_write.assert_called_with(bytes.fromhex('01 02 03'), address=None, restore_original_address=False, repeat=0)
|
||||||
|
|
||||||
|
def test_repeat(self):
|
||||||
|
# Act
|
||||||
|
self.display._write((bytes.fromhex('01 02 03'), 3))
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
self.interface.offload_write.assert_called_with(bytes.fromhex('01 02 03'), address=None, restore_original_address=False, repeat=2)
|
||||||
|
|
||||||
class EncodeAsciiCharacterTestCase(unittest.TestCase):
|
class EncodeAsciiCharacterTestCase(unittest.TestCase):
|
||||||
def test_mapped_character(self):
|
def test_mapped_character(self):
|
||||||
self.assertEqual(encode_ascii_character(ord('a')), 0x80)
|
self.assertEqual(encode_ascii_character(ord('a')), 0x80)
|
||||||
|
|||||||
Reference in New Issue
Block a user