From e9821ed39ee8be0b6ce1ab1acca0d15d13b04ba0 Mon Sep 17 00:00:00 2001 From: Andrew Kay Date: Mon, 9 Sep 2019 21:01:58 -0500 Subject: [PATCH] Initial TN3270 support --- README.md | 15 ++- oec/__main__.py | 43 ++++++-- oec/display.py | 25 ++++- oec/keyboard.py | 9 ++ oec/keymap_3278_2.py | 3 + oec/keymap_3483.py | 10 +- oec/tn3270.py | 201 ++++++++++++++++++++++++++++++++++++ oec/vt100.py | 22 ++-- requirements.txt | 1 + tests/test_display.py | 91 ++++++++++------ tests/test_keyboard.py | 12 ++- tests/test_tn3270.py | 228 +++++++++++++++++++++++++++++++++++++++++ tests/test_vt100.py | 8 +- 13 files changed, 603 insertions(+), 65 deletions(-) create mode 100644 oec/tn3270.py create mode 100644 tests/test_tn3270.py diff --git a/README.md b/README.md index 47e384d..6987af1 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ oec is an open replacement for the IBM 3174 Establishment Controller. -It is still a work in progress - as of now it only provides basic VT100 emulation but the goal is to implement TN3270 and multiple logical terminal support. +It is a work in progress - as of now it only provides basic TN3270 and VT100 emulation. ## Usage @@ -16,12 +16,19 @@ python -m venv VIRTUALENV pip install -r requirements.txt --no-deps ``` -Assuming your interface is connected to `/dev/ttyUSB0` and you want to run `/bin/sh` as the host process: +Assuming your interface is connected to `/dev/ttyUSB0` and you want to connect to a TN3270 host named `mainframe`: ``` -python -m oec /dev/ttyUSB0 /bin/sh -l +python -m oec /dev/ttyUSB0 tn3270 mainframe +``` + +If you want to use the VT100 emulator and run `/bin/sh` as the host process: + +``` +python -m oec /dev/ttyUSB0 vt100 /bin/sh -l ``` ## See Also -* [coax-interface](https://github.com/lowobservable/coax-interface) - tools for interfacing with IBM 3270 type terminals +* [coax-interface](https://github.com/lowobservable/coax-interface) - Tools for interfacing with IBM 3270 type terminals +* [pytn3270](https://github.com/lowobservable/pytn3270) - Python TN3270 library diff --git a/oec/__main__.py b/oec/__main__.py index 0d9f832..0a0e4d3 100644 --- a/oec/__main__.py +++ b/oec/__main__.py @@ -5,20 +5,47 @@ from serial import Serial from coax import Interface1 from .controller import Controller +from .tn3270 import TN3270Session from .vt100 import VT100Session logging.basicConfig(level=logging.INFO) -def main(): - parser = argparse.ArgumentParser(description='VT100 emulator.') +def _create_session(args, terminal): + if args.emulator == 'tn3270': + return TN3270Session(terminal, args.host, args.port) - parser.add_argument('port', help='Serial port') - parser.add_argument('command', help='Host process') - parser.add_argument('command_args', nargs=argparse.REMAINDER, help='Host process arguments') + if args.emulator == 'vt100': + host_command = [args.command, *args.command_args] + + return VT100Session(terminal, host_command) + + raise ValueError('Unsupported emulator') + +def main(): + parser = argparse.ArgumentParser(description=('An open replacement for the IBM 3174 ' + 'Establishment Controller')) + + parser.add_argument('serial_port', help='Serial port') + + subparsers = parser.add_subparsers(dest='emulator', required=True, + description='Emulator') + + tn3270_parser = subparsers.add_parser('tn3270', description='TN3270 emulator', + help='TN3270 emulator') + + tn3270_parser.add_argument('host', help='Hostname') + tn3270_parser.add_argument('port', nargs='?', default=23, type=int) + + vt100_parser = subparsers.add_parser('vt100', description='VT100 emulator', + help='VT100 emulator') + + vt100_parser.add_argument('command', help='Host process') + vt100_parser.add_argument('command_args', nargs=argparse.REMAINDER, + help='Host process arguments') args = parser.parse_args() - with Serial(args.port, 115200) as serial: + with Serial(args.serial_port, 115200) as serial: serial.reset_input_buffer() serial.reset_output_buffer() @@ -33,9 +60,7 @@ def main(): print(f'Interface firmware version {firmware_version}') # Initialize and start the controller. - host_command = [args.command, *args.command_args] - - create_session = lambda terminal: VT100Session(terminal, host_command) + create_session = lambda terminal: _create_session(args, terminal) controller = Controller(interface, create_session) diff --git a/oec/display.py b/oec/display.py index 3cd757b..72041e1 100644 --- a/oec/display.py +++ b/oec/display.py @@ -137,8 +137,12 @@ _ASCII_CHAR_MAP = { '*': 0xbf } +_EBCDIC_CHAR_MAP = {ascii_character.encode('cp500')[0]: byte for ascii_character, byte in _ASCII_CHAR_MAP.items()} + ASCII_CHAR_MAP = [_ASCII_CHAR_MAP.get(character, 0x00) for character in map(chr, range(256))] +EBCDIC_CHAR_MAP = [_EBCDIC_CHAR_MAP.get(character, 0x00) for character in range(256)] + def encode_ascii_character(character): """Map an ASCII character to a terminal display character.""" if character > 255: @@ -146,6 +150,13 @@ def encode_ascii_character(character): return ASCII_CHAR_MAP[character] +def encode_ebcdic_character(character): + """Map an EBCDIC character to a terminal display character.""" + if character > 255: + return 0x00 + + return EBCDIC_CHAR_MAP[character] + def encode_string(string, errors='replace'): """Map a string to terminal display characters.""" return bytes([encode_ascii_character(character) for character @@ -170,9 +181,9 @@ class Display: self.status_line = StatusLine(self.interface, columns) - def move_cursor(self, row, column, force_load=False): + def move_cursor(self, index=None, row=None, column=None, force_load=False): """Load the address counter.""" - address = self._calculate_address(row=row, column=column) + address = self._calculate_address(index=index, row=row, column=column) # TODO: Verify that the address is within range - exclude status line. @@ -185,8 +196,12 @@ class Display: return True - def buffered_write(self, byte, row, column): - index = self._get_index(row, column) + def buffered_write(self, byte, index=None, row=None, column=None): + if index is None: + if row is None or column is None: + raise ValueError('Either index or row and column is required') + + index = self._get_index(row, column) # TODO: Verify that index is within range. @@ -222,7 +237,7 @@ class Display: self.dirty.clear() - self.move_cursor(0, 0, force_load=True) + self.move_cursor(row=0, column=0, force_load=True) def _get_index(self, row, column): return (row * self.dimensions.columns) + column diff --git a/oec/keyboard.py b/oec/keyboard.py index 5cd4d9b..ee05614 100644 --- a/oec/keyboard.py +++ b/oec/keyboard.py @@ -385,3 +385,12 @@ def get_ascii_character_for_key(key): return None return chr(value) + +def get_ebcdic_character_for_key(key): + """Map a key to EBCDIC character.""" + ascii_character = get_ascii_character_for_key(key) + + if not ascii_character: + return None + + return ascii_character.encode('cp500')[0] diff --git a/oec/keymap_3278_2.py b/oec/keymap_3278_2.py index a9bdcc2..4893dbd 100644 --- a/oec/keymap_3278_2.py +++ b/oec/keymap_3278_2.py @@ -181,6 +181,9 @@ KEYMAP_ALT = { 48: Key.PF11, 17: Key.PF12, + # Second Row + 53: Key.HOME, + # Right 95: Key.PA1, 94: Key.PA2, diff --git a/oec/keymap_3483.py b/oec/keymap_3483.py index d73eec0..4daf0e8 100644 --- a/oec/keymap_3483.py +++ b/oec/keymap_3483.py @@ -213,11 +213,17 @@ KEYMAP_SHIFT = { # Center 99: Key.ROLL_UP, - 96: Key.ROLL_DOWN, + 96: Key.ROLL_DOWN } KEYMAP_ALT = { - **KEYMAP_DEFAULT + **KEYMAP_DEFAULT, + + # Control Keys + 6: Key.CLEAR, + + # Center + 98: Key.HOME } KEYMAP = Keymap('3483', KEYMAP_DEFAULT, KEYMAP_SHIFT, KEYMAP_ALT, modifier_release=240) diff --git a/oec/tn3270.py b/oec/tn3270.py new file mode 100644 index 0000000..d079d05 --- /dev/null +++ b/oec/tn3270.py @@ -0,0 +1,201 @@ +""" +oec.tn3270 +~~~~~~~~~~ +""" + +import logging +from tn3270 import Telnet, Emulator, AttributeCell, CharacterCell, AID, OperatorError, \ + ProtectedCellOperatorError + +from .session import Session, SessionDisconnectedError +from .display import encode_ebcdic_character, encode_string +from .keyboard import Key, get_ebcdic_character_for_key + +AID_KEY_MAP = { + Key.CLEAR: AID.CLEAR, + Key.ENTER: AID.ENTER, + #Key.PA1: AID.PA1, + #Key.PA2: AID.PA2, + #Key.PA3: AID.PA3, + Key.PF1: AID.PF1, + Key.PF2: AID.PF2, + Key.PF3: AID.PF3, + Key.PF4: AID.PF4, + Key.PF5: AID.PF5, + Key.PF6: AID.PF6, + Key.PF7: AID.PF7, + Key.PF8: AID.PF8, + Key.PF9: AID.PF9, + Key.PF10: AID.PF10, + Key.PF11: AID.PF11, + Key.PF12: AID.PF12, + Key.PF13: AID.PF13, + Key.PF14: AID.PF14, + Key.PF15: AID.PF15, + Key.PF16: AID.PF16, + Key.PF17: AID.PF17, + Key.PF18: AID.PF18, + Key.PF19: AID.PF19, + Key.PF20: AID.PF20, + Key.PF21: AID.PF21, + Key.PF22: AID.PF22, + Key.PF23: AID.PF23, + Key.PF24: AID.PF24 +} + +class TN3270Session(Session): + """TN3270 session.""" + + def __init__(self, terminal, host, port): + self.logger = logging.getLogger(__name__) + + self.terminal = terminal + self.host = host + self.port = port + + self.telnet = None + self.emulator = None + self.operator_error = None + + # TODO: Should the message area be initialized here? + self.message_area = None + self.last_message_area = None + + def start(self): + self._connect_host() + + (rows, columns) = self.terminal.display.dimensions + + self.emulator = Emulator(self.telnet, rows, columns) + + def terminate(self): + if self.telnet: + self._disconnect_host() + + self.emulator = None + + def handle_host(self): + try: + if not self.emulator.update(timeout=0): + return False + except EOFError: + self._disconnect_host() + + raise SessionDisconnectedError + + self._apply() + self._flush() + + return True + + def handle_key(self, key, keyboard_modifiers, scan_code): + aid = AID_KEY_MAP.get(key) + + try: + if aid is not None: + self.emulator.aid(aid) + + # TODO: is this where we show the clock? + #elif key == Key.RESET: + elif key == Key.BACKSPACE: + self.emulator.backspace() + elif key == Key.TAB: + self.emulator.tab() + elif key == Key.BACKTAB: + self.emulator.tab(direction=-1) + elif key in [Key.NEWLINE, Key.FIELD_EXIT]: + self.emulator.newline() + elif key == Key.HOME: + self.emulator.home() + elif key == Key.UP: + self.emulator.cursor_up() + elif key == Key.DOWN: + self.emulator.cursor_down() + elif key == Key.LEFT: + self.emulator.cursor_left() + elif key == Key.RIGHT: + self.emulator.cursor_right() + #elif key == Key.INSERT: + elif key == Key.DELETE: + self.emulator.delete() + else: + byte = get_ebcdic_character_for_key(key) + + if byte: + self.emulator.input(byte) + except OperatorError as error: + self.operator_error = error + + self._apply() + self._flush() + + def _connect_host(self): + terminal_type = f'IBM-3278-{self.terminal.terminal_id.model}' + + self.telnet = Telnet(terminal_type) + + self.telnet.open(self.host, self.port) + + def _disconnect_host(self): + self.telnet.close() + + self.telnet = None + + def _apply(self): + for (address, cell) in enumerate(self.emulator.cells): + byte = 0x00 + + if isinstance(cell, AttributeCell): + byte = self._map_attribute(cell.attribute) + elif isinstance(cell, CharacterCell): + byte = encode_ebcdic_character(cell.byte) + + self.terminal.display.buffered_write(byte, index=address) + + # Update the message area. + self.message_area = self._format_message_area() + + def _flush(self): + self.terminal.display.flush() + + # TODO: hmm we need a buffered status line... + if self.message_area != self.last_message_area: + self.terminal.display.status_line.write(8, self.message_area) + + self.last_message_area = self.message_area + + # TODO: see note in VT100 about forcing sync + self.terminal.display.move_cursor(index=self.emulator.cursor_address) + + # TODO: eek, is this the correct place to do this? + self.operator_error = None + + def _map_attribute(self, attribute): + # NOTE: This mapping may not be correct and does not take into account + # lightpen detectable fields. + if attribute.hidden: + return 0xcc + + if attribute.protected: + if attribute.intensified: + return 0xe8 + + return 0xe0 + + if attribute.intensified: + return 0xc8 + + return 0xc0 + + def _format_message_area(self): + message_area = b'' + + if self.operator_error: + if isinstance(self.operator_error, ProtectedCellOperatorError): + # X SPACE ARROW_LEFT OPERATOR ARROW_RIGHT + message_area = b'\xf6\x00\xf8\xdb\xd8' + elif self.emulator.keyboard_locked: + # X SPACE SYSTEM + message_area = b'\xf6\x00' + encode_string('SYSTEM') + + return message_area.ljust(9, b'\x00') diff --git a/oec/vt100.py b/oec/vt100.py index 199d123..5ce5fcf 100644 --- a/oec/vt100.py +++ b/oec/vt100.py @@ -99,32 +99,36 @@ class VT100Session(Session): self.terminal.display.status_line.write_string(45, 'VT100') # Reset the cursor. - self.terminal.display.move_cursor(0, 0) + self.terminal.display.move_cursor(row=0, column=0) def terminate(self): if self.host_process: self._terminate_host_process() def handle_host(self): + data = None + try: if self.host_process not in select([self.host_process], [], [], 0)[0]: return False data = self.host_process.read() - - self._handle_host_output(data) - - return True except EOFError: self.host_process = None raise SessionDisconnectedError + self._handle_host_output(data) + + return True + def handle_key(self, key, keyboard_modifiers, scan_code): bytes_ = self._map_key(key, keyboard_modifiers) - if bytes_ is not None: - self.host_process.write(bytes_) + if bytes_ is None: + return + + self.host_process.write(bytes_) def _map_key(self, key, keyboard_modifiers): if keyboard_modifiers.is_alt(): @@ -193,7 +197,7 @@ class VT100Session(Session): # TODO: Investigate multi-byte or zero-byte cases further. byte = encode_ascii_character(ord(character.data)) if len(character.data) == 1 else 0x00 - self.terminal.display.buffered_write(byte, row, column) + self.terminal.display.buffered_write(byte, row=row, column=column) def _flush(self): self.terminal.display.flush() @@ -202,4 +206,4 @@ class VT100Session(Session): # reliable - maybe it needs to be forced sometimes. cursor = self.vt100_screen.cursor - self.terminal.display.move_cursor(cursor.y, cursor.x) + self.terminal.display.move_cursor(row=cursor.y, column=cursor.x) diff --git a/requirements.txt b/requirements.txt index 41702e6..9e107e1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ ptyprocess==0.6.0 pycoax==0.1.2 pyserial==3.4 pyte==0.8.0 +pytn3270==0.1.0 sliplib==0.3.0 sortedcontainers==2.1.0 wcwidth==0.1.7 diff --git a/tests/test_display.py b/tests/test_display.py index cbba3d3..a16ee88 100644 --- a/tests/test_display.py +++ b/tests/test_display.py @@ -3,7 +3,7 @@ from unittest.mock import Mock import context -from oec.display import Dimensions, Display, encode_ascii_character, encode_string +from oec.display import Dimensions, Display, encode_ascii_character, encode_ebcdic_character, encode_string class DisplayMoveCursorTestCase(unittest.TestCase): def setUp(self): @@ -15,7 +15,16 @@ class DisplayMoveCursorTestCase(unittest.TestCase): def test(self): # Act - self.display.move_cursor(10, 15) + self.display.move_cursor(index=815) + + # Assert + self.assertEqual(self.display.address_counter, 895) + + self.interface.offload_load_address_counter.assert_called_with(895) + + def test_with_row_and_column(self): + # Act + self.display.move_cursor(row=10, column=15) # Assert self.assertEqual(self.display.address_counter, 895) @@ -24,12 +33,12 @@ class DisplayMoveCursorTestCase(unittest.TestCase): def test_no_change(self): # Arrange - self.display.move_cursor(0, 0) + self.display.move_cursor(index=0) self.interface.offload_load_address_counter.reset_mock() # Act - self.display.move_cursor(0, 0) + self.display.move_cursor(index=0) # Assert self.assertEqual(self.display.address_counter, 80) @@ -38,12 +47,12 @@ class DisplayMoveCursorTestCase(unittest.TestCase): def test_no_change_force(self): # Arrange - self.display.move_cursor(0, 0) + self.display.move_cursor(index=0) self.interface.offload_load_address_counter.reset_mock() # Act - self.display.move_cursor(0, 0, force_load=True) + self.display.move_cursor(index=0, force_load=True) # Assert self.assertEqual(self.display.address_counter, 80) @@ -60,8 +69,18 @@ class DisplayBufferedWriteTestCase(unittest.TestCase): def test(self): # Act - self.display.buffered_write(0x01, 0, 15) - self.display.buffered_write(0x02, 1, 17) + self.display.buffered_write(0x01, index=15) + self.display.buffered_write(0x02, index=97) + + # Assert + self.assertEqual(self.display.buffer[15], 0x01) + self.assertEqual(self.display.buffer[97], 0x02) + self.assertSequenceEqual(self.display.dirty, [15, 97]) + + def test_with_row_and_column(self): + # Act + self.display.buffered_write(0x01, row=0, column=15) + self.display.buffered_write(0x02, row=1, column=17) # Assert self.assertEqual(self.display.buffer[15], 0x01) @@ -69,15 +88,15 @@ class DisplayBufferedWriteTestCase(unittest.TestCase): self.assertSequenceEqual(self.display.dirty, [15, 97]) def test_change(self): - self.assertTrue(self.display.buffered_write(0x01, 0, 0)) - self.assertTrue(self.display.buffered_write(0x02, 0, 0)) + self.assertTrue(self.display.buffered_write(0x01, index=0)) + self.assertTrue(self.display.buffered_write(0x02, index=0)) self.assertEqual(self.display.buffer[0], 0x02) self.assertSequenceEqual(self.display.dirty, [0]) def test_no_change(self): - self.assertTrue(self.display.buffered_write(0x01, 0, 0)) - self.assertFalse(self.display.buffered_write(0x01, 0, 0)) + self.assertTrue(self.display.buffered_write(0x01, index=0)) + self.assertFalse(self.display.buffered_write(0x01, index=0)) self.assertEqual(self.display.buffer[0], 0x01) self.assertSequenceEqual(self.display.dirty, [0]) @@ -101,9 +120,9 @@ class DisplayFlushTestCase(unittest.TestCase): def test_single_range(self): # Arrange - self.display.buffered_write(0x01, 0, 0) - self.display.buffered_write(0x02, 0, 1) - self.display.buffered_write(0x03, 0, 2) + self.display.buffered_write(0x01, index=0) + self.display.buffered_write(0x02, index=1) + self.display.buffered_write(0x03, index=2) # Act self.display.flush() @@ -113,12 +132,12 @@ class DisplayFlushTestCase(unittest.TestCase): def test_multiple_ranges(self): # Arrange - self.display.buffered_write(0x01, 0, 0) - self.display.buffered_write(0x02, 0, 1) - self.display.buffered_write(0x03, 0, 2) - self.display.buffered_write(0x05, 0, 30) - self.display.buffered_write(0x06, 0, 31) - self.display.buffered_write(0x04, 0, 20) + self.display.buffered_write(0x01, index=0) + self.display.buffered_write(0x02, index=1) + self.display.buffered_write(0x03, index=2) + self.display.buffered_write(0x05, index=30) + self.display.buffered_write(0x06, index=31) + self.display.buffered_write(0x04, index=20) # Act self.display.flush() @@ -136,7 +155,7 @@ class DisplayClearTestCase(unittest.TestCase): def test_excluding_status_line(self): # Arrange - self.display.buffered_write(0x01, 0, 0) + self.display.buffered_write(0x01, index=0) self.assertEqual(self.display.buffer[0], 0x01) self.assertTrue(self.display.dirty) @@ -153,7 +172,7 @@ class DisplayClearTestCase(unittest.TestCase): def test_including_status_line(self): # Arrange - self.display.buffered_write(0x01, 0, 0) + self.display.buffered_write(0x01, index=0) self.assertEqual(self.display.buffer[0], 0x01) self.assertTrue(self.display.dirty) @@ -178,11 +197,11 @@ class DisplayFlushRangeTestCase(unittest.TestCase): def test_when_start_address_is_current_address_counter(self): # Arrange - self.display.move_cursor(0, 0) + self.display.move_cursor(index=0) - self.display.buffered_write(0x01, 0, 0) - self.display.buffered_write(0x02, 0, 1) - self.display.buffered_write(0x03, 0, 2) + self.display.buffered_write(0x01, index=0) + self.display.buffered_write(0x02, index=1) + self.display.buffered_write(0x03, index=2) # Act self.display.flush() @@ -195,11 +214,11 @@ class DisplayFlushRangeTestCase(unittest.TestCase): def test_when_start_address_is_not_current_address_counter(self): # Arrange - self.display.move_cursor(0, 70) + self.display.move_cursor(index=70) - self.display.buffered_write(0x01, 0, 0) - self.display.buffered_write(0x02, 0, 1) - self.display.buffered_write(0x03, 0, 2) + self.display.buffered_write(0x01, index=0) + self.display.buffered_write(0x02, index=1) + self.display.buffered_write(0x03, index=2) # Act self.display.flush() @@ -220,6 +239,16 @@ class EncodeAsciiCharacterTestCase(unittest.TestCase): def test_out_of_range(self): self.assertEqual(encode_ascii_character(ord('✓')), 0x00) +class EncodeEbcdicCharacterTestCase(unittest.TestCase): + def test_mapped_character(self): + self.assertEqual(encode_ebcdic_character(129), 0x80) + + def test_unmapped_character(self): + self.assertEqual(encode_ebcdic_character(185), 0x00) + + def test_out_of_range(self): + self.assertEqual(encode_ebcdic_character(256), 0x00) + class EncodeStringTestCase(unittest.TestCase): def test_mapped_characters(self): self.assertEqual(encode_string('Hello, world!'), bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19')) diff --git a/tests/test_keyboard.py b/tests/test_keyboard.py index a17bcb8..2f6a37b 100644 --- a/tests/test_keyboard.py +++ b/tests/test_keyboard.py @@ -2,7 +2,7 @@ import unittest import context -from oec.keyboard import KeyboardModifiers, Key, Keymap, Keyboard, get_ascii_character_for_key +from oec.keyboard import KeyboardModifiers, Key, Keymap, Keyboard, get_ascii_character_for_key, get_ebcdic_character_for_key from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2 from oec.keymap_3483 import KEYMAP as KEYMAP_3483 @@ -209,3 +209,13 @@ class GetAsciiCharacterForKeyTestCase(unittest.TestCase): def test_mapping(self): self.assertEqual(get_ascii_character_for_key(Key.UPPER_A), 'A') + +class GetEbcdicCharacterForKeyTestCase(unittest.TestCase): + def test_none(self): + self.assertIsNone(get_ebcdic_character_for_key(None)) + + def test_no_mapping(self): + self.assertIsNone(get_ebcdic_character_for_key(Key.ATTN)) + + def test_mapping(self): + self.assertEqual(get_ebcdic_character_for_key(Key.UPPER_A), 193) diff --git a/tests/test_tn3270.py b/tests/test_tn3270.py new file mode 100644 index 0000000..dc6581f --- /dev/null +++ b/tests/test_tn3270.py @@ -0,0 +1,228 @@ +import unittest +from unittest.mock import Mock + +import context + +from oec.session import SessionDisconnectedError +from oec.keyboard import Key, KeyboardModifiers +from oec.tn3270 import TN3270Session +from tn3270 import AttributeCell, CharacterCell, AID, ProtectedCellOperatorError + +class SessionHandleHostTestCase(unittest.TestCase): + def setUp(self): + self.terminal = Mock() + + self.terminal.display = MockDisplay(24, 80) + + self.session = TN3270Session(self.terminal, 'mainframe', 23) + + self.telnet = Mock() + + self.session.telnet = self.telnet + self.session.emulator = Mock() + + def test_no_changes(self): + # Arrange + self.session.emulator.update = Mock(return_value=False) + + # Act and assert + self.assertFalse(self.session.handle_host()) + + def test_changes(self): + # Arrange + self.session.emulator.update = Mock(return_value=True) + + cells = _create_screen_cells(24, 80) + + _set_attribute(cells, 0, MockAttribute(protected=True)) + _set_characters(cells, 1, 'PROTECTED'.encode('cp500')) + _set_attribute(cells, 10, MockAttribute(protected=True, intensified=True)) + _set_characters(cells, 11, 'PROTECTED INTENSIFIED'.encode('cp500')) + _set_attribute(cells, 32, MockAttribute(protected=True, hidden=True)) + _set_characters(cells, 33, 'PROTECTED HIDDEN'.encode('cp500')) + _set_attribute(cells, 49, MockAttribute(protected=False)) + _set_characters(cells, 50, 'UNPROTECTED'.encode('cp500')) + _set_attribute(cells, 61, MockAttribute(protected=False, intensified=True)) + _set_characters(cells, 62, 'UNPROTECTED INTENSIFIED'.encode('cp500')) + _set_attribute(cells, 85, MockAttribute(protected=False, hidden=True)) + _set_characters(cells, 86, 'UNPROTECTED HIDDEN'.encode('cp500')) + _set_attribute(cells, 104, MockAttribute(protected=True)) + + self.session.emulator.cells = cells + self.session.emulator.cursor_address = 8 + + # Act and assert + self.assertTrue(self.session.handle_host()) + + self.terminal.display.flush.assert_called() + + self.assertEqual(self.terminal.display.buffer[:105], bytes.fromhex('e0afb1aeb3a4a2b3a4a3e8afb1aeb3a4a2b3a4a300a8adb3a4adb2a8a5a8a4a3ccafb1aeb3a4a2b3a4a300a7a8a3a3a4adc0b4adafb1aeb3a4a2b3a4a3c8b4adafb1aeb3a4a2b3a4a300a8adb3a4adb2a8a5a8a4a3ccb4adafb1aeb3a4a2b3a4a300a7a8a3a3a4ade0')) + self.assertTrue(all([byte == 0x00 for byte in self.terminal.display.buffer[105:]])) + + self.assertEqual(self.terminal.display.cursor_index, 8) + + def test_eof(self): + # Arrange + self.session.emulator.update = Mock(side_effect=EOFError) + + # Act and assert + with self.assertRaises(SessionDisconnectedError): + self.session.handle_host() + + self.telnet.close.assert_called() + + def test_keyboard_locked(self): + # Arrange + self.session.emulator.update = Mock(return_value=True) + + self.session.emulator.cells = _create_screen_cells(24, 80) + + # Act + self.session.handle_host() + + # Assert + self.terminal.display.status_line.write.assert_called_with(8, bytes.fromhex('f600b2b8b2b3a4ac00')) + +class SessionHandleKeyTestCase(unittest.TestCase): + def setUp(self): + self.terminal = Mock() + + self.session = TN3270Session(self.terminal, 'mainframe', 23) + + self.session.emulator = Mock() + + self.session.emulator.cells = [] + + def test_enter(self): + # Act + self.session.handle_key(Key.ENTER, KeyboardModifiers.NONE, None) + + # Assert + self.session.emulator.aid.assert_called_with(AID.ENTER) + + def test_backspace(self): + # Act + self.session.handle_key(Key.BACKSPACE, KeyboardModifiers.NONE, None) + + # Assert + self.session.emulator.backspace.assert_called() + + def test_tab(self): + # Act + self.session.handle_key(Key.TAB, KeyboardModifiers.NONE, None) + + # Assert + self.session.emulator.tab.assert_called() + + def test_backtab(self): + # Act + self.session.handle_key(Key.BACKTAB, KeyboardModifiers.NONE, None) + + # Assert + self.session.emulator.tab.assert_called_with(direction=-1) + + def test_newline(self): + # Act + self.session.handle_key(Key.NEWLINE, KeyboardModifiers.NONE, None) + + # Assert + self.session.emulator.newline.assert_called() + + def test_field_exit(self): + # Act + self.session.handle_key(Key.FIELD_EXIT, KeyboardModifiers.NONE, None) + + # Assert + self.session.emulator.newline.assert_called() + + def test_home(self): + # Act + self.session.handle_key(Key.HOME, KeyboardModifiers.NONE, None) + + # Assert + self.session.emulator.home.assert_called() + + def test_up(self): + # Act + self.session.handle_key(Key.UP, KeyboardModifiers.NONE, None) + + # Assert + self.session.emulator.cursor_up.assert_called() + + def test_down(self): + # Act + self.session.handle_key(Key.DOWN, KeyboardModifiers.NONE, None) + + # Assert + self.session.emulator.cursor_down.assert_called() + + def test_left(self): + # Act + self.session.handle_key(Key.LEFT, KeyboardModifiers.NONE, None) + + # Assert + self.session.emulator.cursor_left.assert_called() + + def test_right(self): + # Act + self.session.handle_key(Key.RIGHT, KeyboardModifiers.NONE, None) + + # Assert + self.session.emulator.cursor_right.assert_called() + + def test_delete(self): + # Act + self.session.handle_key(Key.DELETE, KeyboardModifiers.NONE, None) + + # Assert + self.session.emulator.delete.assert_called() + + def test_input(self): + # Act + self.session.handle_key(Key.LOWER_A, KeyboardModifiers.NONE, None) + + # Assert + self.session.emulator.input.assert_called_with(0x81) + + def test_protected_cell_operator_error(self): + # Arrange + self.session.emulator.input = Mock(side_effect=ProtectedCellOperatorError) + + # Act + self.session.handle_key(Key.LOWER_A, KeyboardModifiers.NONE, None) + + # Assert + self.terminal.display.status_line.write.assert_called_with(8, bytes.fromhex('f600f8dbd800000000')) + +class MockDisplay: + def __init__(self, rows, columns): + self.buffer = bytearray(rows * columns) + self.cursor_index = None + + self.status_line = Mock() + + self.flush = Mock() + + def buffered_write(self, byte, index): + self.buffer[index] = byte + + def move_cursor(self, index): + self.cursor_index = index + +class MockAttribute: + def __init__(self, protected=False, intensified=False, hidden=False): + self.protected = protected + self.intensified = intensified + self.hidden = hidden + +def _create_screen_cells(rows, columns): + return [CharacterCell(0x00) for address in range(rows * columns)] + +def _set_attribute(screen, index, attribute): + screen[index] = AttributeCell(attribute) + +def _set_characters(screen, index, bytes_): + for byte in bytes_: + screen[index] = CharacterCell(byte) + + index += 1 diff --git a/tests/test_vt100.py b/tests/test_vt100.py index c4df9d2..8b10ce7 100644 --- a/tests/test_vt100.py +++ b/tests/test_vt100.py @@ -27,13 +27,13 @@ class SessionHandleHostTestCase(unittest.TestCase): session.handle_host() # Assert - terminal.display.buffered_write.assert_any_call(0x80, 0, 0) - terminal.display.buffered_write.assert_any_call(0x81, 0, 1) - terminal.display.buffered_write.assert_any_call(0x82, 0, 2) + terminal.display.buffered_write.assert_any_call(0x80, row=0, column=0) + terminal.display.buffered_write.assert_any_call(0x81, row=0, column=1) + terminal.display.buffered_write.assert_any_call(0x82, row=0, column=2) terminal.display.flush.assert_called() - terminal.display.move_cursor.assert_called_with(0, 3) + terminal.display.move_cursor.assert_called_with(row=0, column=3) class SessionHandleKeyTestCase(unittest.TestCase): def setUp(self):