Refactor display

This commit is contained in:
Andrew Kay
2021-05-28 22:54:44 -05:00
parent e10cf494d8
commit bdb0866db1
8 changed files with 1027 additions and 630 deletions

View File

@@ -4,6 +4,7 @@ oec.display
"""
from collections import namedtuple
from itertools import zip_longest
import logging
from more_itertools import interleave
from sortedcontainers import SortedSet
@@ -11,6 +12,308 @@ from coax import read_address_counter_hi, read_address_counter_lo, \
load_address_counter_hi, load_address_counter_lo, write_data, \
eab_load_mask, eab_write_alternate
# Does not include the status line row.
Dimensions = namedtuple('Dimensions', ['rows', 'columns'])
class Display:
def __init__(self, terminal, dimensions, eab_address, jumbo_write_strategy=None):
self.logger = logging.getLogger(__name__)
self.terminal = terminal
self.dimensions = dimensions
self.eab_address = eab_address
self.jumbo_write_strategy = jumbo_write_strategy
self.address_counter = None
self.last_address = ((dimensions.rows + 1) * dimensions.columns) - 1
self.status_line = StatusLine(self)
def clear(self, clear_status_line=False):
"""Clear the screen."""
(rows, columns) = self.dimensions
if clear_status_line:
address = 0
count = (rows + 1) * columns
else:
address = columns
count = rows * columns
regen_data = (b'\x00', count)
eab_data = (b'\x00', count) if self.has_eab else None
self.write(regen_data, eab_data, address=address)
self.move_cursor(row=0, column=0, force_load=True)
def move_cursor(self, address=None, index=None, row=None, column=None, force_load=False):
"""Load the address counter."""
address = self._calculate_address(address, index, row, column)
if address is None:
raise ValueError('Either address, index or row and column is required')
return self._load_address_counter(address, force_load)
def write(self, regen_data, eab_data, address=None, index=None, row=None, column=None, restore_original_address=False):
if eab_data is not None:
if not self.has_eab:
raise RuntimeError('No EAB feature')
if isinstance(regen_data, tuple) and isinstance(eab_data, tuple):
if len(regen_data[0]) != len(eab_data[0]):
raise ValueError('Regen and EAB pattern length must be equal')
if regen_data[1] != eab_data[1]:
raise ValueError('Regen and EAB pattern count must be equal')
elif not isinstance(regen_data, tuple) and not isinstance(eab_data, tuple):
if len(regen_data) != len(eab_data):
raise ValueError('Regen and EAB data length must be equal')
else:
raise ValueError('Regen and EAB data must be provided in same form')
if restore_original_address:
original_address = self.address_counter if self.address_counter is not None else self._read_address_counter()
address = self._calculate_address(address, index, row, column)
if address is not None:
self._load_address_counter(address, force_load=False)
if eab_data is not None:
if isinstance(regen_data, tuple):
data = (bytes(interleave(regen_data[0], eab_data[0])), regen_data[1])
else:
data = bytes(interleave(regen_data, eab_data))
self._eab_write_alternate(data)
else:
self._write_data(regen_data)
if isinstance(regen_data, tuple):
count = len(regen_data[0]) * regen_data[1]
else:
count = len(regen_data)
self.address_counter = self._calculate_address_after_write(self.address_counter, count)
if restore_original_address:
self._load_address_counter(original_address, force_load=True)
@property
def has_eab(self):
return self.eab_address is not None
def load_eab_mask(self, mask):
if not self.has_eab:
raise RuntimeError('No EAB feature')
eab_load_mask(self.terminal.interface, self.eab_address, mask)
def toggle_cursor_blink(self):
self.terminal.control.cursor_blink = not self.terminal.control.cursor_blink
self.terminal.load_control_register()
def toggle_cursor_reverse(self):
self.terminal.control.cursor_reverse = not self.terminal.control.cursor_reverse
self.terminal.load_control_register()
def _calculate_address(self, address=None, index=None, row=None, column=None):
if index is not None:
address = self.dimensions.columns + index
if row is not None and column is not None:
address = self.dimensions.columns + (row * self.dimensions.columns) + column
if address is None:
return None
if address > self.last_address:
raise ValueError('Address is out of range')
return address
def _calculate_address_after_write(self, address, count):
if address is None:
return None
address += count
# TODO: Determine the correct behavior here...
if address > self.last_address:
return None
return address
def _read_address_counter(self):
hi = read_address_counter_hi(self.terminal.interface)
lo = read_address_counter_lo(self.terminal.interface)
self.address_counter = (hi << 8) | lo
return self.address_counter
def _load_address_counter(self, address, force_load):
if address == self.address_counter and not force_load:
return False
(hi, lo) = _split_address(address)
(current_hi, current_lo) = _split_address(self.address_counter)
if hi != current_hi or force_load:
load_address_counter_hi(self.terminal.interface, hi)
if lo != current_lo or force_load:
load_address_counter_lo(self.terminal.interface, lo)
self.address_counter = address
return True
def _write_data(self, data):
write_data(self.terminal.interface, data, jumbo_write_strategy=self.jumbo_write_strategy)
def _eab_write_alternate(self, data):
eab_write_alternate(self.terminal.interface, self.eab_address, data, jumbo_write_strategy=self.jumbo_write_strategy)
def _split_address(address):
if address is None:
return (None, None)
return ((address >> 8) & 0xff, address & 0xff)
class StatusLine:
def __init__(self, display):
self.display = display
self.columns = display.dimensions.columns
def write(self, column, data):
if column >= self.columns:
raise ValueError('Column is out of range')
if column + len(data) > self.columns:
raise ValueError('Length is out of range')
self.display.write(data, None, address=column, restore_original_address=True)
def write_string(self, column, string):
self.write(column, encode_string(string))
def write_keyboard_modifiers(self, modifiers):
indicators = bytearray(1)
if modifiers.is_shift():
indicators[0] = 0xda
self.write(35, indicators)
def write_keyboard_insert(self, insert):
indicators = bytearray(1)
if insert:
indicators[0] = 0xd3
self.write(45, indicators)
class BufferedDisplay(Display):
def __init__(self, terminal, dimensions, eab_address, jumbo_write_strategy=None):
super().__init__(terminal, dimensions, eab_address, jumbo_write_strategy)
length = (self.dimensions.rows + 1) * self.dimensions.columns
self.regen_buffer = bytearray(length)
self.eab_buffer = bytearray(length) if self.has_eab else None
self.dirty = SortedSet()
def buffered_write_byte(self, regen_byte, eab_byte, address=None, index=None, row=None, column=None):
if eab_byte is not None:
if not self.has_eab:
raise RuntimeError('No EAB feature')
address = self._calculate_address(address, index, row, column)
if address is None:
raise ValueError('Either address, index or row and column is required')
if self.regen_buffer[address] == regen_byte and (not self.has_eab or self.eab_buffer[address] == eab_byte):
return False
self.regen_buffer[address] = regen_byte
if self.has_eab:
self.eab_buffer[address] = eab_byte
self.dirty.add(address)
return True
def flush(self):
dirty_ranges = self._get_dirty_ranges()
if not dirty_ranges:
return False
for (start_address, end_address) in dirty_ranges:
self._write_range(start_address, end_address)
return True
def write(self, regen_data, eab_data, address=None, index=None, row=None, column=None, restore_original_address=False):
start_address = self._calculate_address(address, index, row, column)
# Unlike a unbuffered write, the current address is required in order to commit the write.
if start_address is None:
start_address = self.address_counter if self.address_counter is not None else self._read_address_counter()
super().write(regen_data, eab_data, address=address, index=index, row=row, column=column,
restore_original_address=restore_original_address)
self._commit(start_address, regen_data, eab_data)
def _commit(self, start_address, regen_data, eab_data):
if isinstance(regen_data, tuple):
expanded_regen_data = regen_data[0] * regen_data[1]
expanded_eab_data = eab_data[0] * eab_data[1] if eab_data is not None else []
else:
expanded_regen_data = regen_data
expanded_eab_data = eab_data if eab_data is not None else []
address = start_address
for (regen_byte, eab_byte) in zip_longest(expanded_regen_data, expanded_eab_data):
self.regen_buffer[address] = regen_byte
if eab_byte is not None:
self.eab_buffer[address] = eab_byte
self.dirty.discard(address)
address += 1
def _write_range(self, start_address, end_address):
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f'Writing range {start_address}-{end_address}')
regen_data = self.regen_buffer[start_address:end_address+1]
eab_data = self.eab_buffer[start_address:end_address+1] if self.has_eab else None
try:
self.write(regen_data, eab_data, address=start_address)
except Exception as error:
# TODO: This could leave the address_counter incorrect.
self.logger.error(f'Write error: {error}', exc_info=error)
def _get_dirty_ranges(self):
if not self.dirty:
return []
# TODO: Implement multiple ranges with optimization.
return [(self.dirty[0], self.dirty[-1])]
_ASCII_CHAR_MAP = {
'>': 0x08,
'<': 0x09,
@@ -165,260 +468,3 @@ def encode_string(string, errors='replace'):
"""Map a string to terminal display characters."""
return bytes([encode_ascii_character(character) for character
in string.encode('ascii', errors)])
# Does not include the status line row.
Dimensions = namedtuple('Dimensions', ['rows', 'columns'])
class Display:
def __init__(self, terminal, dimensions, eab_address, jumbo_write_strategy=None):
self.logger = logging.getLogger(__name__)
self.terminal = terminal
self.dimensions = dimensions
self.eab_address = eab_address
(rows, columns) = self.dimensions
self.regen_buffer = bytearray(rows * columns)
self.eab_buffer = bytearray(rows * columns)
self.dirty = SortedSet()
self.address_counter = None
self.status_line = StatusLine(self)
self.jumbo_write_strategy = jumbo_write_strategy
@property
def has_eab(self):
return self.eab_address is not None
def move_cursor(self, index=None, row=None, column=None, force_load=False):
"""Load the address counter."""
address = self._calculate_address(index=index, row=row, column=column)
# TODO: Verify that the address is within range - exclude status line.
return self._load_address_counter(address, force_load)
def load_eab_mask(self, mask):
"""Load the EAB mask."""
if not self.has_eab:
raise RuntimeError('No EAB feature')
eab_load_mask(self.terminal.interface, self.eab_address, mask)
def buffered_write(self, regen_byte, eab_byte, index=None, row=None, column=None):
if index is None:
if row is None or column is None:
raise ValueError('Either index or row and column is required')
index = self._get_index(row, column)
# TODO: Verify that index is within range.
if self.regen_buffer[index] == regen_byte and (self.eab_buffer[index] == eab_byte or not self.has_eab):
return False
self.regen_buffer[index] = regen_byte
self.eab_buffer[index] = eab_byte if self.has_eab else 0x00
self.dirty.add(index)
return True
def flush(self):
for (start_index, end_index) in self._get_dirty_ranges():
self._flush_range(start_index, end_index)
def clear(self, clear_status_line=False):
"""Clear the screen."""
(rows, columns) = self.dimensions
if clear_status_line:
address = 0
count = (rows + 1) * columns
else:
address = columns
count = rows * columns
self._write((b'\x00', count), (b'\x00', count) if self.has_eab else None, address=address)
# Update the buffers and dirty indicators to reflect the cleared screen.
for index in range(rows * columns):
self.regen_buffer[index] = 0x00
self.eab_buffer[index] = 0x00
self.dirty.clear()
self.move_cursor(row=0, column=0, force_load=True)
def toggle_cursor_blink(self):
self.terminal.control.cursor_blink = not self.terminal.control.cursor_blink
self.terminal.load_control_register()
def toggle_cursor_reverse(self):
self.terminal.control.cursor_reverse = not self.terminal.control.cursor_reverse
self.terminal.load_control_register()
def _get_index(self, row, column):
return (row * self.dimensions.columns) + column
def _calculate_address(self, index=None, row=None, column=None):
if index is not None:
return self.dimensions.columns + index
if row is not None and column is not None:
return self.dimensions.columns + self._get_index(row, column)
raise ValueError('Either index or row and column is required')
def _calculate_address_after_write(self, address, count):
if address is None:
return None
address += count
(rows, columns) = self.dimensions
# TODO: Determine the correct behavior here...
if self.address_counter >= self._calculate_address((rows * columns) - 1):
return None
return address
def _read_address_counter(self):
hi = read_address_counter_hi(self.terminal.interface)
lo = read_address_counter_lo(self.terminal.interface)
return (hi << 8) | lo
def _load_address_counter(self, address, force_load):
if address == self.address_counter and not force_load:
return False
(hi, lo) = _split_address(address)
(current_hi, current_lo) = _split_address(self.address_counter)
if hi != current_hi or force_load:
load_address_counter_hi(self.terminal.interface, hi)
if lo != current_lo or force_load:
load_address_counter_lo(self.terminal.interface, lo)
self.address_counter = address
return True
def _get_dirty_ranges(self):
if not self.dirty:
return []
# TODO: Implement multiple ranges with optimization.
return [(self.dirty[0], self.dirty[-1])]
def _flush_range(self, start_index, end_index):
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f'Flushing changes for range {start_index}-{end_index}')
regen_data = self.regen_buffer[start_index:end_index+1]
eab_data = self.eab_buffer[start_index:end_index+1] if self.has_eab else None
address = self._calculate_address(start_index)
try:
self._write(regen_data, eab_data, address=address)
except Exception as error:
# TODO: This could leave the address_counter incorrect.
self.logger.error(f'Write error: {error}', exc_info=error)
for index in range(start_index, end_index + 1):
self.dirty.discard(index)
return self.address_counter
def _write(self, regen_data, eab_data, address=None, restore_original_address=False):
if eab_data is not None:
if not self.has_eab:
raise RuntimeError('No EAB feature')
if isinstance(regen_data, tuple) and isinstance(eab_data, tuple):
if len(regen_data[0]) != len(eab_data[0]):
raise ValueError('Regen and EAB pattern length must be equal')
if regen_data[1] != eab_data[1]:
raise ValueError('Regen and EAB pattern count must be equal')
elif not isinstance(regen_data, tuple) and not isinstance(eab_data, tuple):
if len(regen_data) != len(eab_data):
raise ValueError('Regen and EAB data length must be equal')
else:
raise ValueError('Regen and EAB data must be provided in same form')
if restore_original_address:
original_address = self.address_counter
if original_address is None:
original_address = self._read_address_counter()
if address is not None:
self._load_address_counter(address, force_load=False)
if eab_data is not None:
# Validation of regen and EAB data form has been performed.
if isinstance(regen_data, tuple):
data = (bytes(interleave(regen_data[0], eab_data[0])), regen_data[1])
else:
data = bytes(interleave(regen_data, eab_data))
eab_write_alternate(self.terminal.interface, self.eab_address, data,
jumbo_write_strategy=self.jumbo_write_strategy)
else:
write_data(self.terminal.interface, regen_data,
jumbo_write_strategy=self.jumbo_write_strategy)
if isinstance(regen_data, tuple):
length = len(regen_data[0]) * regen_data[1]
else:
length = len(regen_data)
self.address_counter = self._calculate_address_after_write(address, length)
if restore_original_address:
self._load_address_counter(original_address, force_load=True)
# TODO: add validation of column and data length for write() - must be inside status line
class StatusLine:
def __init__(self, display):
self.display = display
self.columns = display.dimensions.columns
def write(self, column, data):
self.display._write(data, None, address=column, restore_original_address=True)
def write_string(self, column, string):
self.write(column, encode_string(string))
def write_keyboard_modifiers(self, modifiers):
indicators = bytearray(1)
if modifiers.is_shift():
indicators[0] = 0xda
self.write(35, indicators)
def write_keyboard_insert(self, insert):
indicators = bytearray(1)
if insert:
indicators[0] = 0xd3
self.write(45, indicators)
def _split_address(address):
if address is None:
return (None, None)
return ((address >> 8) & 0xff, address & 0xff)