""" oec.tn3270 ~~~~~~~~~~ """ import logging from tn3270 import Telnet, TN3270EFunction, Emulator, AttributeCell, CharacterCell, AID, Color, \ Highlight, OperatorError, ProtectedCellOperatorError, FieldOverflowOperatorError from tn3270.ebcdic import DUP, FM from .session import Session, SessionDisconnectedError from .display import encode_character, encode_string from .keyboard import Key, get_character_for_key AID_KEY_MAP = { Key.CLEAR: AID.CLEAR, Key.ENTER: AID.ENTER, Key.PA1: AID.PA1, Key.PA2: AID.PA2, Key.PA3: AID.PA3, Key.PF1: AID.PF1, Key.PF2: AID.PF2, Key.PF3: AID.PF3, Key.PF4: AID.PF4, Key.PF5: AID.PF5, Key.PF6: AID.PF6, Key.PF7: AID.PF7, Key.PF8: AID.PF8, Key.PF9: AID.PF9, Key.PF10: AID.PF10, Key.PF11: AID.PF11, Key.PF12: AID.PF12, Key.PF13: AID.PF13, Key.PF14: AID.PF14, Key.PF15: AID.PF15, Key.PF16: AID.PF16, Key.PF17: AID.PF17, Key.PF18: AID.PF18, Key.PF19: AID.PF19, Key.PF20: AID.PF20, Key.PF21: AID.PF21, Key.PF22: AID.PF22, Key.PF23: AID.PF23, Key.PF24: AID.PF24 } class TN3270Session(Session): """TN3270 session.""" def __init__(self, terminal, host, port, device_names, character_encoding, tn3270e_profile): super().__init__(terminal) self.logger = logging.getLogger(__name__) self.host = host self.port = port self.device_names = device_names self.character_encoding = character_encoding self.tn3270e_profile = tn3270e_profile self.telnet = None self.emulator = None self.keyboard_insert = False self.waiting_on_host = False self.operator_error = None # TODO: Should the message area be initialized here? self.message_area = None self.last_message_area = None def start(self): self._connect_host() (rows, columns) = self.terminal.display.dimensions if self.terminal.display.has_eab: supported_colors = 8 supported_highlights = [Highlight.BLINK, Highlight.REVERSE, Highlight.UNDERSCORE] else: supported_colors = 1 supported_highlights = [] self.emulator = Emulator(self.telnet, rows, columns, supported_colors, supported_highlights) self.emulator.alarm = lambda: self.terminal.sound_alarm() def terminate(self): if self.telnet: self._disconnect_host() self.emulator = None def fileno(self): return self.emulator.stream.socket.fileno() def handle_host(self): try: if not self.emulator.update(timeout=0): return False except (EOFError, ConnectionResetError): self._disconnect_host() raise SessionDisconnectedError self.waiting_on_host = False return True def handle_key(self, key, keyboard_modifiers, scan_code): aid = AID_KEY_MAP.get(key) try: if aid is not None: self._reset_insert() self.emulator.aid(aid) self.waiting_on_host = True #elif key == Key.RESET: elif key == Key.TAB: self.emulator.tab() elif key == Key.BACKTAB: self.emulator.tab(direction=-1) elif key == Key.NEWLINE: self.emulator.newline() elif key == Key.HOME: self.emulator.home() elif key == Key.UP: self.emulator.cursor_up() elif key == Key.DOWN: self.emulator.cursor_down() elif key == Key.LEFT: self.emulator.cursor_left() elif key == Key.LEFT_2: self.emulator.cursor_left(rate=2) elif key == Key.RIGHT: self.emulator.cursor_right() elif key == Key.RIGHT_2: self.emulator.cursor_right(rate=2) elif key == Key.BACKSPACE: self.emulator.backspace() elif key == Key.DELETE: self.emulator.delete() elif key == Key.ERASE_EOF: self.emulator.erase_end_of_field() elif key == Key.ERASE_INPUT: self.emulator.erase_input() elif key == Key.INSERT: self._handle_insert_key() elif key == Key.DUP: self.emulator.dup() elif key == Key.FIELD_MARK: self.emulator.field_mark() else: character = get_character_for_key(key) if character: byte = character.encode(self.character_encoding)[0] self.emulator.input(byte, self.keyboard_insert) except OperatorError as error: self.operator_error = error def render(self): self._apply() self._flush() def _reset_insert(self): if not self.keyboard_insert: return self.keyboard_insert = False self.terminal.display.status_line.write_keyboard_insert(False) def _handle_insert_key(self): self.keyboard_insert = not self.keyboard_insert self.terminal.display.status_line.write_keyboard_insert(self.keyboard_insert) def _connect_host(self): # We will pretend a 3279 without EAB is a 3278. if self.terminal.display.has_eab: type = '3279' else: type = '3278' # Although a IBM 3278 does not support the formatting enabled by the extended # data stream, the capabilities will be reported in the query reply. terminal_type = f'IBM-{type}-{self.terminal.terminal_id.model}-E' self.logger.info(f'Terminal Type = {terminal_type}') tn3270e_args = _get_tn3270e_args(self.tn3270e_profile) self.telnet = Telnet(terminal_type, **tn3270e_args) self.telnet.open(self.host, self.port, self.device_names) if self.telnet.is_tn3270e_negotiated: self.logger.info(f'TN3270E mode negotiated: Device Type = {self.telnet.device_type}, Device Name = {self.telnet.device_name}, Functions = {self.telnet.tn3270e_functions}') else: self.logger.debug('Unable to negotiate TN3270E mode') def _disconnect_host(self): self.telnet.close() self.telnet = None def _apply(self): has_eab = self.terminal.display.has_eab for address in self.emulator.dirty: cell = self.emulator.cells[address] (regen_byte, eab_byte) = _map_cell(cell, self.character_encoding, has_eab) self.terminal.display.buffered_write_byte(regen_byte, eab_byte, index=address) self.emulator.dirty.clear() # Update the message area. self.message_area = self._format_message_area() def _flush(self): self.terminal.display.flush() # TODO: hmm we need a buffered status line... if self.message_area != self.last_message_area: self.terminal.display.status_line.write(8, self.message_area) self.last_message_area = self.message_area self.terminal.display.move_cursor(index=self.emulator.cursor_address) # TODO: This needs to be moved. self.operator_error = None def _format_message_area(self): message_area = b'' if self.waiting_on_host: # X SPACE CLOCK_LEFT CLOCK_RIGHT message_area = b'\xf6\x00\xf4\xf5' elif isinstance(self.operator_error, ProtectedCellOperatorError): # X SPACE ARROW_LEFT OPERATOR ARROW_RIGHT message_area = b'\xf6\x00\xf8\xdb\xd8' elif isinstance(self.operator_error, FieldOverflowOperatorError): # X SPACE OPERATOR > message_area = b'\xf6\x00\xdb' + encode_string('>') elif self.emulator.keyboard_locked: # X SPACE SYSTEM message_area = b'\xf6\x00' + encode_string('SYSTEM') return message_area.ljust(9, b'\x00') def _map_cell(cell, character_encoding, has_eab): regen_byte = 0x00 if isinstance(cell, AttributeCell): # Only map the protected and display bits - ignore numeric, skip and modified. regen_byte = 0xc0 | (cell.attribute.value & 0x2c) elif isinstance(cell, CharacterCell): byte = cell.byte if cell.character_set is not None: # TODO: Temporary workaround until character set support is added. regen_byte = encode_character('ß') elif byte == DUP: regen_byte = encode_character('*') elif byte == FM: regen_byte = encode_character(';') else: character = bytes([byte]).decode(character_encoding) regen_byte = encode_character(character) if not has_eab: return (regen_byte, None) eab_byte = _map_formatting(cell.formatting) return (regen_byte, eab_byte) def _map_formatting(formatting): if formatting is None: return 0x00 byte = 0x00 # Map the 3270 color to EAB color. if formatting.color == Color.BLUE: byte |= 0x08 elif formatting.color == Color.RED: byte |= 0x10 elif formatting.color == Color.PINK: byte |= 0x18 elif formatting.color == Color.GREEN: byte |= 0x20 elif formatting.color == Color.TURQUOISE: byte |= 0x28 elif formatting.color == Color.YELLOW: byte |= 0x30 elif formatting.color == Color.WHITE: byte |= 0x38 # Map the 3270 highlight to EAB highlight. if formatting.blink: byte |= 0x40 elif formatting.reverse: byte |= 0x80 elif formatting.underscore: byte |= 0xc0 return byte def _get_tn3270e_args(profile): is_tn3270e_enabled = True tn3270e_functions = [TN3270EFunction.RESPONSES] if profile == 'off': is_tn3270e_enabled = False tn3270e_functions = None elif profile == 'basic': tn3270e_functions = [] return { 'is_tn3270e_enabled': is_tn3270e_enabled, 'tn3270e_functions': tn3270e_functions }