mirror of
https://github.com/lowobservable/oec.git
synced 2026-01-13 15:27:20 +00:00
314 lines
7.6 KiB
Python
314 lines
7.6 KiB
Python
"""
|
|
oec.display
|
|
~~~~~~~~~~~
|
|
"""
|
|
|
|
from collections import namedtuple
|
|
import logging
|
|
from sortedcontainers import SortedSet
|
|
|
|
_ASCII_CHAR_MAP = {
|
|
'>': 0x08,
|
|
'<': 0x09,
|
|
'[': 0x0a,
|
|
']': 0x0b,
|
|
')': 0x0c,
|
|
'(': 0x0d,
|
|
'}': 0x0e,
|
|
'{': 0x0f,
|
|
|
|
# 0x10 - A real space?
|
|
'=': 0x11,
|
|
'\'': 0x12,
|
|
'"': 0x13,
|
|
'/': 0x14,
|
|
'\\': 0x15,
|
|
'|': 0x16,
|
|
'¦': 0x17,
|
|
'?': 0x18,
|
|
'!': 0x19,
|
|
'$': 0x1a,
|
|
'¢': 0x1b,
|
|
'£': 0x1c,
|
|
'¥': 0x1d,
|
|
# 0x1e - A P/T looking symbol
|
|
# 0x1f - A intertwined parens symbol
|
|
|
|
'0': 0x20,
|
|
'1': 0x21,
|
|
'2': 0x22,
|
|
'3': 0x23,
|
|
'4': 0x24,
|
|
'5': 0x25,
|
|
'6': 0x26,
|
|
'7': 0x27,
|
|
'8': 0x28,
|
|
'9': 0x29,
|
|
'ß': 0x2a,
|
|
'§': 0x2b,
|
|
'#': 0x2c,
|
|
'@': 0x2d,
|
|
'%': 0x2e,
|
|
'_': 0x2f,
|
|
|
|
'&': 0x30,
|
|
'-': 0x31,
|
|
'.': 0x32,
|
|
',': 0x33,
|
|
':': 0x34,
|
|
'+': 0x35,
|
|
'¬': 0x36,
|
|
'¯': 0x37, # ???
|
|
'°': 0x38,
|
|
# 0x39 - Accent?
|
|
'^': 0x3a, # More like an accent
|
|
'~': 0x3b, # More like an accent
|
|
'¨': 0x3c,
|
|
# 0x3d - Accute accent?
|
|
# 0x3e - Opposite of accute accent?
|
|
# 0x3f - A more extreme comma?
|
|
|
|
'a': 0x80,
|
|
'b': 0x81,
|
|
'c': 0x82,
|
|
'd': 0x83,
|
|
'e': 0x84,
|
|
'f': 0x85,
|
|
'g': 0x86,
|
|
'h': 0x87,
|
|
'i': 0x88,
|
|
'j': 0x89,
|
|
'k': 0x8a,
|
|
'l': 0x8b,
|
|
'm': 0x8c,
|
|
'n': 0x8d,
|
|
'o': 0x8e,
|
|
'p': 0x8f,
|
|
|
|
'q': 0x90,
|
|
'r': 0x91,
|
|
's': 0x92,
|
|
't': 0x93,
|
|
'u': 0x94,
|
|
'v': 0x95,
|
|
'w': 0x96,
|
|
'x': 0x97,
|
|
'y': 0x98,
|
|
'z': 0x99,
|
|
'æ': 0x9a,
|
|
'ø': 0x9b,
|
|
'å': 0x9c,
|
|
'ç': 0x9d,
|
|
# 0x9e - Semi colon with top line
|
|
# 0x9f - Asterisk with top line
|
|
|
|
'A': 0xa0,
|
|
'B': 0xa1,
|
|
'C': 0xa2,
|
|
'D': 0xa3,
|
|
'E': 0xa4,
|
|
'F': 0xa5,
|
|
'G': 0xa6,
|
|
'H': 0xa7,
|
|
'I': 0xa8,
|
|
'J': 0xa9,
|
|
'K': 0xaa,
|
|
'L': 0xab,
|
|
'M': 0xac,
|
|
'N': 0xad,
|
|
'O': 0xae,
|
|
'P': 0xaf,
|
|
|
|
'Q': 0xb0,
|
|
'R': 0xb1,
|
|
'S': 0xb2,
|
|
'T': 0xb3,
|
|
'U': 0xb4,
|
|
'V': 0xb5,
|
|
'W': 0xb6,
|
|
'X': 0xb7,
|
|
'Y': 0xb8,
|
|
'Z': 0xb9,
|
|
'Æ': 0xba,
|
|
'Ø': 0xbb,
|
|
'Å': 0xbc,
|
|
'Ç': 0xbd,
|
|
';': 0xbe,
|
|
'*': 0xbf
|
|
}
|
|
|
|
_EBCDIC_CHAR_MAP = {ascii_character.encode('cp500')[0]: byte for ascii_character, byte in _ASCII_CHAR_MAP.items()}
|
|
|
|
ASCII_CHAR_MAP = [_ASCII_CHAR_MAP.get(character, 0x00) for character in map(chr, range(256))]
|
|
|
|
EBCDIC_CHAR_MAP = [_EBCDIC_CHAR_MAP.get(character, 0x00) for character in range(256)]
|
|
|
|
def encode_ascii_character(character):
|
|
"""Map an ASCII character to a terminal display character."""
|
|
if character > 255:
|
|
return 0x00
|
|
|
|
return ASCII_CHAR_MAP[character]
|
|
|
|
def encode_ebcdic_character(character):
|
|
"""Map an EBCDIC character to a terminal display character."""
|
|
if character > 255:
|
|
return 0x00
|
|
|
|
return EBCDIC_CHAR_MAP[character]
|
|
|
|
def encode_string(string, errors='replace'):
|
|
"""Map a string to terminal display characters."""
|
|
return bytes([encode_ascii_character(character) for character
|
|
in string.encode('ascii', errors)])
|
|
|
|
# Does not include the status line row.
|
|
Dimensions = namedtuple('Dimensions', ['rows', 'columns'])
|
|
|
|
class Display:
|
|
def __init__(self, interface, dimensions):
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
self.interface = interface
|
|
self.dimensions = dimensions
|
|
|
|
(rows, columns) = self.dimensions
|
|
|
|
self.buffer = bytearray(rows * columns)
|
|
self.dirty = SortedSet()
|
|
|
|
self.address_counter = None
|
|
|
|
self.status_line = StatusLine(self.interface, columns)
|
|
|
|
def move_cursor(self, index=None, row=None, column=None, force_load=False):
|
|
"""Load the address counter."""
|
|
address = self._calculate_address(index=index, row=row, column=column)
|
|
|
|
# TODO: Verify that the address is within range - exclude status line.
|
|
|
|
if address == self.address_counter and not force_load:
|
|
return False
|
|
|
|
self.interface.offload_load_address_counter(address)
|
|
|
|
self.address_counter = address
|
|
|
|
return True
|
|
|
|
def buffered_write(self, byte, index=None, row=None, column=None):
|
|
if index is None:
|
|
if row is None or column is None:
|
|
raise ValueError('Either index or row and column is required')
|
|
|
|
index = self._get_index(row, column)
|
|
|
|
# TODO: Verify that index is within range.
|
|
|
|
if self.buffer[index] == byte:
|
|
return False
|
|
|
|
self.buffer[index] = byte
|
|
|
|
self.dirty.add(index)
|
|
|
|
return True
|
|
|
|
def flush(self):
|
|
for (start_index, end_index) in self._get_dirty_ranges():
|
|
self._flush_range(start_index, end_index)
|
|
|
|
def clear(self, clear_status_line=False):
|
|
"""Clear the screen."""
|
|
(rows, columns) = self.dimensions
|
|
|
|
if clear_status_line:
|
|
address = 0
|
|
repeat = ((rows + 1) * columns) - 1
|
|
else:
|
|
address = columns
|
|
repeat = (rows * columns) - 1
|
|
|
|
self.interface.offload_write(b'\x00', address=address, repeat=repeat)
|
|
|
|
# Update the buffer and dirty indicators to reflect the cleared screen.
|
|
for index in range(rows * columns):
|
|
self.buffer[index] = 0x00
|
|
|
|
self.dirty.clear()
|
|
|
|
self.move_cursor(row=0, column=0, force_load=True)
|
|
|
|
def _get_index(self, row, column):
|
|
return (row * self.dimensions.columns) + column
|
|
|
|
def _calculate_address(self, index=None, row=None, column=None):
|
|
if index is not None:
|
|
return self.dimensions.columns + index
|
|
|
|
if row is not None and column is not None:
|
|
return self.dimensions.columns + self._get_index(row, column)
|
|
|
|
raise ValueError('Either index or row and column is required')
|
|
|
|
def _calculate_address_after_write(self, address, count):
|
|
address += count
|
|
|
|
(rows, columns) = self.dimensions
|
|
|
|
# TODO: Determine the correct behavior here...
|
|
if self.address_counter >= self._calculate_address((rows * columns) - 1):
|
|
return None
|
|
|
|
return address
|
|
|
|
def _get_dirty_ranges(self):
|
|
if not self.dirty:
|
|
return []
|
|
|
|
# TODO: Implement multiple ranges with optimization.
|
|
return [(self.dirty[0], self.dirty[-1])]
|
|
|
|
def _flush_range(self, start_index, end_index):
|
|
if self.logger.isEnabledFor(logging.DEBUG):
|
|
self.logger.debug(f'Flushing changes for range {start_index}-{end_index}')
|
|
|
|
data = self.buffer[start_index:end_index+1]
|
|
|
|
address = self._calculate_address(start_index)
|
|
|
|
try:
|
|
self.interface.offload_write(data, address=address if address != self.address_counter else None)
|
|
except Exception as error:
|
|
# TODO: This could leave the address_counter incorrect.
|
|
self.logger.error(f'Offload write error: {error}', exc_info=error)
|
|
|
|
self.address_counter = self._calculate_address_after_write(address, len(data))
|
|
|
|
for index in range(start_index, end_index + 1):
|
|
self.dirty.discard(index)
|
|
|
|
return self.address_counter
|
|
|
|
# TODO: add validation of column and data length for write() - must be inside status line
|
|
class StatusLine:
|
|
def __init__(self, interface, columns):
|
|
self.interface = interface
|
|
self.columns = columns
|
|
|
|
def write(self, column, data):
|
|
self.interface.offload_write(data, address=column, restore_original_address=True)
|
|
|
|
def write_string(self, column, string):
|
|
self.write(column, encode_string(string))
|
|
|
|
def write_keyboard_modifiers(self, modifiers):
|
|
indicators = bytearray(1)
|
|
|
|
if modifiers.is_shift():
|
|
indicators[0] = 0xda
|
|
else:
|
|
indicators[0] = 0x00
|
|
|
|
self.write(35, indicators)
|