2023-01-18 07:54:22 -06:00

332 lines
9.9 KiB
Python

"""
oec.tn3270
~~~~~~~~~~
"""
import logging
from tn3270 import Telnet, TN3270EFunction, Emulator, AttributeCell, CharacterCell, AID, Color, \
Highlight, OperatorError, ProtectedCellOperatorError, FieldOverflowOperatorError
from tn3270.ebcdic import DUP, FM
from .session import Session, SessionDisconnectedError
from .display import encode_character, encode_string
from .keyboard import Key, get_character_for_key
AID_KEY_MAP = {
Key.CLEAR: AID.CLEAR,
Key.ENTER: AID.ENTER,
Key.PA1: AID.PA1,
Key.PA2: AID.PA2,
Key.PA3: AID.PA3,
Key.PF1: AID.PF1,
Key.PF2: AID.PF2,
Key.PF3: AID.PF3,
Key.PF4: AID.PF4,
Key.PF5: AID.PF5,
Key.PF6: AID.PF6,
Key.PF7: AID.PF7,
Key.PF8: AID.PF8,
Key.PF9: AID.PF9,
Key.PF10: AID.PF10,
Key.PF11: AID.PF11,
Key.PF12: AID.PF12,
Key.PF13: AID.PF13,
Key.PF14: AID.PF14,
Key.PF15: AID.PF15,
Key.PF16: AID.PF16,
Key.PF17: AID.PF17,
Key.PF18: AID.PF18,
Key.PF19: AID.PF19,
Key.PF20: AID.PF20,
Key.PF21: AID.PF21,
Key.PF22: AID.PF22,
Key.PF23: AID.PF23,
Key.PF24: AID.PF24
}
class TN3270Session(Session):
"""TN3270 session."""
def __init__(self, terminal, host, port, device_names, character_encoding, tn3270e_profile):
super().__init__(terminal)
self.logger = logging.getLogger(__name__)
self.host = host
self.port = port
self.device_names = device_names
self.character_encoding = character_encoding
self.tn3270e_profile = tn3270e_profile
self.telnet = None
self.emulator = None
self.keyboard_insert = False
self.waiting_on_host = False
self.operator_error = None
# TODO: Should the message area be initialized here?
self.message_area = None
self.last_message_area = None
def start(self):
self._connect_host()
(rows, columns) = self.terminal.display.dimensions
if self.terminal.display.has_eab:
supported_colors = 8
supported_highlights = [Highlight.BLINK, Highlight.REVERSE, Highlight.UNDERSCORE]
else:
supported_colors = 1
supported_highlights = []
self.emulator = Emulator(self.telnet, rows, columns, supported_colors, supported_highlights)
self.emulator.alarm = lambda: self.terminal.sound_alarm()
def terminate(self):
if self.telnet:
self._disconnect_host()
self.emulator = None
def fileno(self):
return self.emulator.stream.socket.fileno()
def handle_host(self):
try:
if not self.emulator.update(timeout=0):
return False
except (EOFError, ConnectionResetError):
self._disconnect_host()
raise SessionDisconnectedError
self.waiting_on_host = False
return True
def handle_key(self, key, keyboard_modifiers, scan_code):
aid = AID_KEY_MAP.get(key)
try:
if aid is not None:
self._reset_insert()
self.emulator.aid(aid)
self.waiting_on_host = True
#elif key == Key.RESET:
elif key == Key.TAB:
self.emulator.tab()
elif key == Key.BACKTAB:
self.emulator.tab(direction=-1)
elif key == Key.NEWLINE:
self.emulator.newline()
elif key == Key.HOME:
self.emulator.home()
elif key == Key.UP:
self.emulator.cursor_up()
elif key == Key.DOWN:
self.emulator.cursor_down()
elif key == Key.LEFT:
self.emulator.cursor_left()
elif key == Key.LEFT_2:
self.emulator.cursor_left(rate=2)
elif key == Key.RIGHT:
self.emulator.cursor_right()
elif key == Key.RIGHT_2:
self.emulator.cursor_right(rate=2)
elif key == Key.BACKSPACE:
self.emulator.backspace()
elif key == Key.DELETE:
self.emulator.delete()
elif key == Key.ERASE_EOF:
self.emulator.erase_end_of_field()
elif key == Key.ERASE_INPUT:
self.emulator.erase_input()
elif key == Key.INSERT:
self._handle_insert_key()
elif key == Key.DUP:
self.emulator.dup()
elif key == Key.FIELD_MARK:
self.emulator.field_mark()
else:
character = get_character_for_key(key)
if character:
byte = character.encode(self.character_encoding)[0]
self.emulator.input(byte, self.keyboard_insert)
except OperatorError as error:
self.operator_error = error
def render(self):
self._apply()
self._flush()
def _reset_insert(self):
if not self.keyboard_insert:
return
self.keyboard_insert = False
self.terminal.display.status_line.write_keyboard_insert(False)
def _handle_insert_key(self):
self.keyboard_insert = not self.keyboard_insert
self.terminal.display.status_line.write_keyboard_insert(self.keyboard_insert)
def _connect_host(self):
# We will pretend a 3279 without EAB is a 3278.
if self.terminal.display.has_eab:
type = '3279'
else:
type = '3278'
# Although a IBM 3278 does not support the formatting enabled by the extended
# data stream, the capabilities will be reported in the query reply.
terminal_type = f'IBM-{type}-{self.terminal.terminal_id.model}-E'
self.logger.info(f'Terminal Type = {terminal_type}')
tn3270e_args = _get_tn3270e_args(self.tn3270e_profile)
self.telnet = Telnet(terminal_type, **tn3270e_args)
self.telnet.open(self.host, self.port, self.device_names)
if self.telnet.is_tn3270e_negotiated:
self.logger.info(f'TN3270E mode negotiated: Device Type = {self.telnet.device_type}, Device Name = {self.telnet.device_name}, Functions = {self.telnet.tn3270e_functions}')
else:
self.logger.debug('Unable to negotiate TN3270E mode')
def _disconnect_host(self):
self.telnet.close()
self.telnet = None
def _apply(self):
has_eab = self.terminal.display.has_eab
for address in self.emulator.dirty:
cell = self.emulator.cells[address]
(regen_byte, eab_byte) = _map_cell(cell, self.character_encoding, has_eab)
self.terminal.display.buffered_write_byte(regen_byte, eab_byte, index=address)
self.emulator.dirty.clear()
# Update the message area.
self.message_area = self._format_message_area()
def _flush(self):
self.terminal.display.flush()
# TODO: hmm we need a buffered status line...
if self.message_area != self.last_message_area:
self.terminal.display.status_line.write(8, self.message_area)
self.last_message_area = self.message_area
self.terminal.display.move_cursor(index=self.emulator.cursor_address)
# TODO: This needs to be moved.
self.operator_error = None
def _format_message_area(self):
message_area = b''
if self.waiting_on_host:
# X SPACE CLOCK_LEFT CLOCK_RIGHT
message_area = b'\xf6\x00\xf4\xf5'
elif isinstance(self.operator_error, ProtectedCellOperatorError):
# X SPACE ARROW_LEFT OPERATOR ARROW_RIGHT
message_area = b'\xf6\x00\xf8\xdb\xd8'
elif isinstance(self.operator_error, FieldOverflowOperatorError):
# X SPACE OPERATOR >
message_area = b'\xf6\x00\xdb' + encode_string('>')
elif self.emulator.keyboard_locked:
# X SPACE SYSTEM
message_area = b'\xf6\x00' + encode_string('SYSTEM')
return message_area.ljust(9, b'\x00')
def _map_cell(cell, character_encoding, has_eab):
regen_byte = 0x00
if isinstance(cell, AttributeCell):
# Only map the protected and display bits - ignore numeric, skip and modified.
regen_byte = 0xc0 | (cell.attribute.value & 0x2c)
elif isinstance(cell, CharacterCell):
byte = cell.byte
if cell.character_set is not None:
# TODO: Temporary workaround until character set support is added.
regen_byte = encode_character('ß')
elif byte == DUP:
regen_byte = encode_character('*')
elif byte == FM:
regen_byte = encode_character(';')
else:
character = bytes([byte]).decode(character_encoding)
regen_byte = encode_character(character)
if not has_eab:
return (regen_byte, None)
eab_byte = _map_formatting(cell.formatting)
return (regen_byte, eab_byte)
def _map_formatting(formatting):
if formatting is None:
return 0x00
byte = 0x00
# Map the 3270 color to EAB color.
if formatting.color == Color.BLUE:
byte |= 0x08
elif formatting.color == Color.RED:
byte |= 0x10
elif formatting.color == Color.PINK:
byte |= 0x18
elif formatting.color == Color.GREEN:
byte |= 0x20
elif formatting.color == Color.TURQUOISE:
byte |= 0x28
elif formatting.color == Color.YELLOW:
byte |= 0x30
elif formatting.color == Color.WHITE:
byte |= 0x38
# Map the 3270 highlight to EAB highlight.
if formatting.blink:
byte |= 0x40
elif formatting.reverse:
byte |= 0x80
elif formatting.underscore:
byte |= 0xc0
return byte
def _get_tn3270e_args(profile):
is_tn3270e_enabled = True
tn3270e_functions = [TN3270EFunction.RESPONSES]
if profile == 'off':
is_tn3270e_enabled = False
tn3270e_functions = None
elif profile == 'basic':
tn3270e_functions = []
return {
'is_tn3270e_enabled': is_tn3270e_enabled,
'tn3270e_functions': tn3270e_functions
}