diff --git a/oec/controller.py b/oec/controller.py index f450a9b..8a0520e 100644 --- a/oec/controller.py +++ b/oec/controller.py @@ -83,15 +83,15 @@ class Controller: # Initialize the terminal. self.terminal = Terminal(self.interface, terminal_id, extended_id) - (rows, columns) = self.terminal.dimensions + (rows, columns) = self.terminal.display.dimensions keymap_name = self.terminal.keyboard.keymap.name self.logger.info(f'Rows = {rows}, Columns = {columns}, Keymap = {keymap_name}') - self.terminal.clear_screen() + self.terminal.display.clear_screen() # Show the attached indicator on the status line. - self.terminal.status_line.write_string(0, 'S') + self.terminal.display.status_line.write_string(0, 'S') # Start the process. self.host_process = self._start_host_process() @@ -165,7 +165,7 @@ class Controller: else: indicators[0] = 0x00 - self.terminal.status_line.write(35, indicators) + self.terminal.display.status_line.write(35, indicators) if not key: return @@ -180,7 +180,7 @@ class Controller: environment['LC_ALL'] = 'C' process = PtyProcess.spawn(self.host_command, env=environment, - dimensions=self.terminal.dimensions) + dimensions=self.terminal.display.dimensions) return process diff --git a/oec/display.py b/oec/display.py index 7947eed..75d7c4e 100644 --- a/oec/display.py +++ b/oec/display.py @@ -3,6 +3,10 @@ oec.display ~~~~~~~~~~~ """ +from collections import namedtuple +import logging +from coax import write_data + _ASCII_CHAR_MAP = { '>': 0x08, '<': 0x09, @@ -147,10 +151,138 @@ def encode_string(string, errors='replace'): return bytes([encode_ascii_character(character) for character in string.encode('ascii', errors)]) -# TODO: remove default columns +# Does not include the status line row. +Dimensions = namedtuple('Dimensions', ['rows', 'columns']) + +class Display: + def __init__(self, interface, dimensions): + self.logger = logging.getLogger(__name__) + + self.interface = interface + self.dimensions = dimensions + + (rows, columns) = self.dimensions + + self.buffer = bytearray(rows * columns) + self.dirty = [False for index in range(rows * columns)] + + self.address_counter = None + + self.status_line = StatusLine(self.interface, columns) + + def load_address_counter(self, address=None, index=None, row=None, column=None): + """Load the address counter.""" + if address is None: + address = self.calculate_address(index=index, row=row, column=column) + + self.interface.offload_load_address_counter(address) + + self.address_counter = address + + def clear_screen(self): + """Clear the screen - including the status line.""" + (rows, columns) = self.dimensions + + self.interface.offload_write(b'\x00', address=0, repeat=((rows+1)*columns)-1) + + # TODO: Update the buffer and dirty indicators to reflect the cleared screen. + + self.load_address_counter(index=0) + + def write_buffer(self, byte, index=None, row=None, column=None): + if index is None: + if row is not None and column is not None: + index = self._get_index(row, column) + else: + raise ValueError('Either index or row and column is required') + + if self.buffer[index] == byte: + return False + + self.buffer[index] = byte + self.dirty[index] = True + + return True + + def flush(self): + for (start_index, end_index) in self._get_dirty_ranges(): + self._flush_range(start_index, end_index) + + def calculate_address(self, index=None, row=None, column=None): + if index is not None: + return self.dimensions.columns + index + + if row is not None and column is not None: + return self.dimensions.columns + self._get_index(row, column) + + raise ValueError('Either index or row and column is required') + + def _get_index(self, row, column): + return (row * self.dimensions.columns) + column + + def _get_dirty_ranges(self): + ranges = [] + + start_index = 0 + + while start_index < len(self.dirty): + if self.dirty[start_index]: + break + + start_index += 1 + + end_index = len(self.dirty) - 1 + + while end_index >= 0: + if self.dirty[end_index]: + break + + end_index -= 1 + + if start_index < len(self.dirty) and end_index >= 0: + ranges.append((start_index, end_index)) + + return ranges + + def _flush_range(self, start_index, end_index): + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f'Flushing changes for range {start_index}-{end_index}') + + data = self.buffer[start_index:end_index+1] + + address = self.calculate_address(start_index) + + # TODO: Consider using offload for all writing - set address to None if it is the + # same as the current address counter to avoid the additional load command. + if address != self.address_counter: + try: + self.interface.offload_write(data, address=address) + except Exception as error: + self.logger.error(f'Offload write error: {error}', exc_info=error) + + self.address_counter = address + len(data) + else: + try: + write_data(self.interface, data) + except Exception as error: + self.logger.error(f'WRITE_DATA error: {error}', exc_info=error) + + self.address_counter += len(data) + + # Force the address counter to be updated... + (rows, columns) = self.dimensions + + if self.address_counter >= self.calculate_address((rows * columns) - 1): + self.address_counter = None + + for index in range(start_index, end_index+1): + self.dirty[index] = False + + return self.address_counter + # TODO: add validation of column and data length for write() - must be inside status line class StatusLine: - def __init__(self, interface, columns=80): + def __init__(self, interface, columns): self.interface = interface self.columns = columns diff --git a/oec/emulator.py b/oec/emulator.py index f2281b6..27828f9 100644 --- a/oec/emulator.py +++ b/oec/emulator.py @@ -5,7 +5,6 @@ oec.emulator import logging import pyte -from coax import write_data from .display import encode_ascii_character from .keyboard import Key, get_ascii_character_for_key @@ -75,30 +74,23 @@ class VT100Emulator: self.terminal = terminal self.host = host - self.rows = self.terminal.dimensions.rows - self.columns = self.terminal.dimensions.columns + # Initialize the VT100 screen. + (rows, columns) = self.terminal.display.dimensions - self.vt100_screen = pyte.Screen(self.columns, self.rows) + self.vt100_screen = pyte.Screen(columns, rows) self.vt100_screen.write_process_input = lambda data: host.write(data.encode()) self.vt100_stream = pyte.ByteStream(self.vt100_screen) - # TODO: Consider moving the following three attributes to the Terminal class - # and moving the associated methods. - self.buffer = bytearray(self.rows * self.columns) - self.dirty = [False for index in range(self.rows * self.columns)] - - self.address_counter = self._calculate_address(0) - # Clear the screen. - self.terminal.clear_screen() + self.terminal.display.clear_screen() # Update the status line. - self.terminal.status_line.write_string(45, 'VT100') + self.terminal.display.status_line.write_string(45, 'VT100') # Load the address counter. - self.terminal.interface.offload_load_address_counter(self.address_counter) + self.terminal.display.load_address_counter(index=0) def handle_key(self, key, keyboard_modifiers, scan_code): """Handle a terminal keystroke.""" @@ -150,107 +142,38 @@ class VT100Emulator: return None - def _get_index(self, row, column): - return (row * self.columns) + column - - def _calculate_address(self, cursor_index): - return self.columns + cursor_index - def _apply(self, screen): for row in screen.dirty: row_buffer = screen.buffer[row] - for column in range(self.columns): + for column in range(self.terminal.display.dimensions.columns): character = row_buffer[column] # TODO: Investigate multi-byte or zero-byte cases further. # TODO: Add additional mapping for special cases such as '^'... byte = encode_ascii_character(ord(character.data)) if len(character.data) == 1 else 0x00 - index = self._get_index(row, column) - - if self.buffer[index] != byte: - self.buffer[index] = byte - self.dirty[index] = True + self.terminal.display.write_buffer(byte, row=row, column=column) def _flush(self): - for (start_index, end_index) in self._get_dirty_ranges(): - self._flush_range(start_index, end_index) + display = self.terminal.display + + display.flush() # Syncronize the cursor. cursor = self.vt100_screen.cursor - address = self._calculate_address(self._get_index(cursor.y, cursor.x)) + address = display.calculate_address(row=cursor.y, column=cursor.x) # TODO: Investigate different approaches to reducing the need to syncronize the cursor # or make it more reliable. - if address != self.address_counter: + if address != display.address_counter: if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug((f'Setting address counter: Address = {address}, ' - f'Address Counter = {self.address_counter}')) + f'Address Counter = {display.address_counter}')) - self.terminal.interface.offload_load_address_counter(address) - - self.address_counter = address + display.load_address_counter(address) else: if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug((f'Skipping address counter: Address Counter = ' - f'{self.address_counter}')) - - def _get_dirty_ranges(self): - ranges = [] - - start_index = 0 - - while start_index < len(self.dirty): - if self.dirty[start_index]: - break - - start_index += 1 - - end_index = len(self.dirty) - 1 - - while end_index >= 0: - if self.dirty[end_index]: - break - - end_index -= 1 - - if start_index < len(self.dirty) and end_index >= 0: - ranges.append((start_index, end_index)) - - return ranges - - def _flush_range(self, start_index, end_index): - if self.logger.isEnabledFor(logging.DEBUG): - self.logger.debug(f'Flushing changes for range {start_index}-{end_index}') - - data = self.buffer[start_index:end_index+1] - - address = self._calculate_address(start_index) - - # TODO: Consider using offload for all writing - set address to None if it is the - # same as the current address counter to avoid the additional load command. - if address != self.address_counter: - try: - self.terminal.interface.offload_write(data, address=address) - except Exception as error: - self.logger.error(f'Offload write error: {error}', exc_info=error) - - self.address_counter = address + len(data) - else: - try: - write_data(self.terminal.interface, data) - except Exception as error: - self.logger.error(f'WRITE_DATA error: {error}', exc_info=error) - - self.address_counter += len(data) - - # Force the address counter to be updated... - if self.address_counter >= self._calculate_address((self.rows * self.columns) - 1): - self.address_counter = None - - for index in range(start_index, end_index+1): - self.dirty[index] = False - - return self.address_counter + f'{display.address_counter}')) diff --git a/oec/terminal.py b/oec/terminal.py index 235adfc..ea3a00d 100644 --- a/oec/terminal.py +++ b/oec/terminal.py @@ -3,16 +3,11 @@ oec.terminal ~~~~~~~~~~~~ """ -from collections import namedtuple - -from .display import StatusLine +from .display import Dimensions, Display from .keyboard import Keyboard from .keymap_3278_2 import KEYMAP as KEYMAP_3278_2 from .keymap_3483 import KEYMAP as KEYMAP_3483 -# Does not include the status line row. -Dimensions = namedtuple('Dimensions', ['rows', 'columns']) - MODEL_DIMENSIONS = { 2: Dimensions(24, 80), 3: Dimensions(32, 80), @@ -27,14 +22,14 @@ def get_dimensions(terminal_id, extended_id): return MODEL_DIMENSIONS[terminal_id.model] -def get_keyboard(terminal_id, extended_id): - """Get keyboard configured with terminal keymap.""" +def get_keymap(terminal_id, extended_id): + """Get terminal keymap.""" keymap = KEYMAP_3278_2 if extended_id == 'c1348300': keymap = KEYMAP_3483 - return Keyboard(keymap) + return keymap class Terminal: """Terminal information, devices and helpers.""" @@ -44,15 +39,8 @@ class Terminal: self.terminal_id = terminal_id self.extended_id = extended_id - self.dimensions = get_dimensions(self.terminal_id, self.extended_id) - self.keyboard = get_keyboard(self.terminal_id, self.extended_id) + dimensions = get_dimensions(self.terminal_id, self.extended_id) + keymap = get_keymap(self.terminal_id, self.extended_id) - self.status_line = StatusLine(self.interface, self.dimensions.columns) - - def clear_screen(self): - """Clear the screen - including the status line.""" - (rows, columns) = self.dimensions - - self.interface.offload_write(b'\x00', address=0, repeat=((rows+1)*columns)-1) - - self.interface.offload_load_address_counter(columns) + self.display = Display(interface, dimensions) + self.keyboard = Keyboard(keymap)