Initial TN3270 support

This commit is contained in:
Andrew Kay
2019-09-09 21:01:58 -05:00
parent ea9a40d42e
commit e9821ed39e
13 changed files with 603 additions and 65 deletions

View File

@@ -2,7 +2,7 @@
oec is an open replacement for the IBM 3174 Establishment Controller.
It is still a work in progress - as of now it only provides basic VT100 emulation but the goal is to implement TN3270 and multiple logical terminal support.
It is a work in progress - as of now it only provides basic TN3270 and VT100 emulation.
## Usage
@@ -16,12 +16,19 @@ python -m venv VIRTUALENV
pip install -r requirements.txt --no-deps
```
Assuming your interface is connected to `/dev/ttyUSB0` and you want to run `/bin/sh` as the host process:
Assuming your interface is connected to `/dev/ttyUSB0` and you want to connect to a TN3270 host named `mainframe`:
```
python -m oec /dev/ttyUSB0 /bin/sh -l
python -m oec /dev/ttyUSB0 tn3270 mainframe
```
If you want to use the VT100 emulator and run `/bin/sh` as the host process:
```
python -m oec /dev/ttyUSB0 vt100 /bin/sh -l
```
## See Also
* [coax-interface](https://github.com/lowobservable/coax-interface) - tools for interfacing with IBM 3270 type terminals
* [coax-interface](https://github.com/lowobservable/coax-interface) - Tools for interfacing with IBM 3270 type terminals
* [pytn3270](https://github.com/lowobservable/pytn3270) - Python TN3270 library

View File

@@ -5,20 +5,47 @@ from serial import Serial
from coax import Interface1
from .controller import Controller
from .tn3270 import TN3270Session
from .vt100 import VT100Session
logging.basicConfig(level=logging.INFO)
def main():
parser = argparse.ArgumentParser(description='VT100 emulator.')
def _create_session(args, terminal):
if args.emulator == 'tn3270':
return TN3270Session(terminal, args.host, args.port)
parser.add_argument('port', help='Serial port')
parser.add_argument('command', help='Host process')
parser.add_argument('command_args', nargs=argparse.REMAINDER, help='Host process arguments')
if args.emulator == 'vt100':
host_command = [args.command, *args.command_args]
return VT100Session(terminal, host_command)
raise ValueError('Unsupported emulator')
def main():
parser = argparse.ArgumentParser(description=('An open replacement for the IBM 3174 '
'Establishment Controller'))
parser.add_argument('serial_port', help='Serial port')
subparsers = parser.add_subparsers(dest='emulator', required=True,
description='Emulator')
tn3270_parser = subparsers.add_parser('tn3270', description='TN3270 emulator',
help='TN3270 emulator')
tn3270_parser.add_argument('host', help='Hostname')
tn3270_parser.add_argument('port', nargs='?', default=23, type=int)
vt100_parser = subparsers.add_parser('vt100', description='VT100 emulator',
help='VT100 emulator')
vt100_parser.add_argument('command', help='Host process')
vt100_parser.add_argument('command_args', nargs=argparse.REMAINDER,
help='Host process arguments')
args = parser.parse_args()
with Serial(args.port, 115200) as serial:
with Serial(args.serial_port, 115200) as serial:
serial.reset_input_buffer()
serial.reset_output_buffer()
@@ -33,9 +60,7 @@ def main():
print(f'Interface firmware version {firmware_version}')
# Initialize and start the controller.
host_command = [args.command, *args.command_args]
create_session = lambda terminal: VT100Session(terminal, host_command)
create_session = lambda terminal: _create_session(args, terminal)
controller = Controller(interface, create_session)

View File

@@ -137,8 +137,12 @@ _ASCII_CHAR_MAP = {
'*': 0xbf
}
_EBCDIC_CHAR_MAP = {ascii_character.encode('cp500')[0]: byte for ascii_character, byte in _ASCII_CHAR_MAP.items()}
ASCII_CHAR_MAP = [_ASCII_CHAR_MAP.get(character, 0x00) for character in map(chr, range(256))]
EBCDIC_CHAR_MAP = [_EBCDIC_CHAR_MAP.get(character, 0x00) for character in range(256)]
def encode_ascii_character(character):
"""Map an ASCII character to a terminal display character."""
if character > 255:
@@ -146,6 +150,13 @@ def encode_ascii_character(character):
return ASCII_CHAR_MAP[character]
def encode_ebcdic_character(character):
"""Map an EBCDIC character to a terminal display character."""
if character > 255:
return 0x00
return EBCDIC_CHAR_MAP[character]
def encode_string(string, errors='replace'):
"""Map a string to terminal display characters."""
return bytes([encode_ascii_character(character) for character
@@ -170,9 +181,9 @@ class Display:
self.status_line = StatusLine(self.interface, columns)
def move_cursor(self, row, column, force_load=False):
def move_cursor(self, index=None, row=None, column=None, force_load=False):
"""Load the address counter."""
address = self._calculate_address(row=row, column=column)
address = self._calculate_address(index=index, row=row, column=column)
# TODO: Verify that the address is within range - exclude status line.
@@ -185,8 +196,12 @@ class Display:
return True
def buffered_write(self, byte, row, column):
index = self._get_index(row, column)
def buffered_write(self, byte, index=None, row=None, column=None):
if index is None:
if row is None or column is None:
raise ValueError('Either index or row and column is required')
index = self._get_index(row, column)
# TODO: Verify that index is within range.
@@ -222,7 +237,7 @@ class Display:
self.dirty.clear()
self.move_cursor(0, 0, force_load=True)
self.move_cursor(row=0, column=0, force_load=True)
def _get_index(self, row, column):
return (row * self.dimensions.columns) + column

View File

@@ -385,3 +385,12 @@ def get_ascii_character_for_key(key):
return None
return chr(value)
def get_ebcdic_character_for_key(key):
"""Map a key to EBCDIC character."""
ascii_character = get_ascii_character_for_key(key)
if not ascii_character:
return None
return ascii_character.encode('cp500')[0]

View File

@@ -181,6 +181,9 @@ KEYMAP_ALT = {
48: Key.PF11,
17: Key.PF12,
# Second Row
53: Key.HOME,
# Right
95: Key.PA1,
94: Key.PA2,

View File

@@ -213,11 +213,17 @@ KEYMAP_SHIFT = {
# Center
99: Key.ROLL_UP,
96: Key.ROLL_DOWN,
96: Key.ROLL_DOWN
}
KEYMAP_ALT = {
**KEYMAP_DEFAULT
**KEYMAP_DEFAULT,
# Control Keys
6: Key.CLEAR,
# Center
98: Key.HOME
}
KEYMAP = Keymap('3483', KEYMAP_DEFAULT, KEYMAP_SHIFT, KEYMAP_ALT, modifier_release=240)

201
oec/tn3270.py Normal file
View File

@@ -0,0 +1,201 @@
"""
oec.tn3270
~~~~~~~~~~
"""
import logging
from tn3270 import Telnet, Emulator, AttributeCell, CharacterCell, AID, OperatorError, \
ProtectedCellOperatorError
from .session import Session, SessionDisconnectedError
from .display import encode_ebcdic_character, encode_string
from .keyboard import Key, get_ebcdic_character_for_key
AID_KEY_MAP = {
Key.CLEAR: AID.CLEAR,
Key.ENTER: AID.ENTER,
#Key.PA1: AID.PA1,
#Key.PA2: AID.PA2,
#Key.PA3: AID.PA3,
Key.PF1: AID.PF1,
Key.PF2: AID.PF2,
Key.PF3: AID.PF3,
Key.PF4: AID.PF4,
Key.PF5: AID.PF5,
Key.PF6: AID.PF6,
Key.PF7: AID.PF7,
Key.PF8: AID.PF8,
Key.PF9: AID.PF9,
Key.PF10: AID.PF10,
Key.PF11: AID.PF11,
Key.PF12: AID.PF12,
Key.PF13: AID.PF13,
Key.PF14: AID.PF14,
Key.PF15: AID.PF15,
Key.PF16: AID.PF16,
Key.PF17: AID.PF17,
Key.PF18: AID.PF18,
Key.PF19: AID.PF19,
Key.PF20: AID.PF20,
Key.PF21: AID.PF21,
Key.PF22: AID.PF22,
Key.PF23: AID.PF23,
Key.PF24: AID.PF24
}
class TN3270Session(Session):
"""TN3270 session."""
def __init__(self, terminal, host, port):
self.logger = logging.getLogger(__name__)
self.terminal = terminal
self.host = host
self.port = port
self.telnet = None
self.emulator = None
self.operator_error = None
# TODO: Should the message area be initialized here?
self.message_area = None
self.last_message_area = None
def start(self):
self._connect_host()
(rows, columns) = self.terminal.display.dimensions
self.emulator = Emulator(self.telnet, rows, columns)
def terminate(self):
if self.telnet:
self._disconnect_host()
self.emulator = None
def handle_host(self):
try:
if not self.emulator.update(timeout=0):
return False
except EOFError:
self._disconnect_host()
raise SessionDisconnectedError
self._apply()
self._flush()
return True
def handle_key(self, key, keyboard_modifiers, scan_code):
aid = AID_KEY_MAP.get(key)
try:
if aid is not None:
self.emulator.aid(aid)
# TODO: is this where we show the clock?
#elif key == Key.RESET:
elif key == Key.BACKSPACE:
self.emulator.backspace()
elif key == Key.TAB:
self.emulator.tab()
elif key == Key.BACKTAB:
self.emulator.tab(direction=-1)
elif key in [Key.NEWLINE, Key.FIELD_EXIT]:
self.emulator.newline()
elif key == Key.HOME:
self.emulator.home()
elif key == Key.UP:
self.emulator.cursor_up()
elif key == Key.DOWN:
self.emulator.cursor_down()
elif key == Key.LEFT:
self.emulator.cursor_left()
elif key == Key.RIGHT:
self.emulator.cursor_right()
#elif key == Key.INSERT:
elif key == Key.DELETE:
self.emulator.delete()
else:
byte = get_ebcdic_character_for_key(key)
if byte:
self.emulator.input(byte)
except OperatorError as error:
self.operator_error = error
self._apply()
self._flush()
def _connect_host(self):
terminal_type = f'IBM-3278-{self.terminal.terminal_id.model}'
self.telnet = Telnet(terminal_type)
self.telnet.open(self.host, self.port)
def _disconnect_host(self):
self.telnet.close()
self.telnet = None
def _apply(self):
for (address, cell) in enumerate(self.emulator.cells):
byte = 0x00
if isinstance(cell, AttributeCell):
byte = self._map_attribute(cell.attribute)
elif isinstance(cell, CharacterCell):
byte = encode_ebcdic_character(cell.byte)
self.terminal.display.buffered_write(byte, index=address)
# Update the message area.
self.message_area = self._format_message_area()
def _flush(self):
self.terminal.display.flush()
# TODO: hmm we need a buffered status line...
if self.message_area != self.last_message_area:
self.terminal.display.status_line.write(8, self.message_area)
self.last_message_area = self.message_area
# TODO: see note in VT100 about forcing sync
self.terminal.display.move_cursor(index=self.emulator.cursor_address)
# TODO: eek, is this the correct place to do this?
self.operator_error = None
def _map_attribute(self, attribute):
# NOTE: This mapping may not be correct and does not take into account
# lightpen detectable fields.
if attribute.hidden:
return 0xcc
if attribute.protected:
if attribute.intensified:
return 0xe8
return 0xe0
if attribute.intensified:
return 0xc8
return 0xc0
def _format_message_area(self):
message_area = b''
if self.operator_error:
if isinstance(self.operator_error, ProtectedCellOperatorError):
# X SPACE ARROW_LEFT OPERATOR ARROW_RIGHT
message_area = b'\xf6\x00\xf8\xdb\xd8'
elif self.emulator.keyboard_locked:
# X SPACE SYSTEM
message_area = b'\xf6\x00' + encode_string('SYSTEM')
return message_area.ljust(9, b'\x00')

View File

@@ -99,32 +99,36 @@ class VT100Session(Session):
self.terminal.display.status_line.write_string(45, 'VT100')
# Reset the cursor.
self.terminal.display.move_cursor(0, 0)
self.terminal.display.move_cursor(row=0, column=0)
def terminate(self):
if self.host_process:
self._terminate_host_process()
def handle_host(self):
data = None
try:
if self.host_process not in select([self.host_process], [], [], 0)[0]:
return False
data = self.host_process.read()
self._handle_host_output(data)
return True
except EOFError:
self.host_process = None
raise SessionDisconnectedError
self._handle_host_output(data)
return True
def handle_key(self, key, keyboard_modifiers, scan_code):
bytes_ = self._map_key(key, keyboard_modifiers)
if bytes_ is not None:
self.host_process.write(bytes_)
if bytes_ is None:
return
self.host_process.write(bytes_)
def _map_key(self, key, keyboard_modifiers):
if keyboard_modifiers.is_alt():
@@ -193,7 +197,7 @@ class VT100Session(Session):
# TODO: Investigate multi-byte or zero-byte cases further.
byte = encode_ascii_character(ord(character.data)) if len(character.data) == 1 else 0x00
self.terminal.display.buffered_write(byte, row, column)
self.terminal.display.buffered_write(byte, row=row, column=column)
def _flush(self):
self.terminal.display.flush()
@@ -202,4 +206,4 @@ class VT100Session(Session):
# reliable - maybe it needs to be forced sometimes.
cursor = self.vt100_screen.cursor
self.terminal.display.move_cursor(cursor.y, cursor.x)
self.terminal.display.move_cursor(row=cursor.y, column=cursor.x)

View File

@@ -2,6 +2,7 @@ ptyprocess==0.6.0
pycoax==0.1.2
pyserial==3.4
pyte==0.8.0
pytn3270==0.1.0
sliplib==0.3.0
sortedcontainers==2.1.0
wcwidth==0.1.7

View File

@@ -3,7 +3,7 @@ from unittest.mock import Mock
import context
from oec.display import Dimensions, Display, encode_ascii_character, encode_string
from oec.display import Dimensions, Display, encode_ascii_character, encode_ebcdic_character, encode_string
class DisplayMoveCursorTestCase(unittest.TestCase):
def setUp(self):
@@ -15,7 +15,16 @@ class DisplayMoveCursorTestCase(unittest.TestCase):
def test(self):
# Act
self.display.move_cursor(10, 15)
self.display.move_cursor(index=815)
# Assert
self.assertEqual(self.display.address_counter, 895)
self.interface.offload_load_address_counter.assert_called_with(895)
def test_with_row_and_column(self):
# Act
self.display.move_cursor(row=10, column=15)
# Assert
self.assertEqual(self.display.address_counter, 895)
@@ -24,12 +33,12 @@ class DisplayMoveCursorTestCase(unittest.TestCase):
def test_no_change(self):
# Arrange
self.display.move_cursor(0, 0)
self.display.move_cursor(index=0)
self.interface.offload_load_address_counter.reset_mock()
# Act
self.display.move_cursor(0, 0)
self.display.move_cursor(index=0)
# Assert
self.assertEqual(self.display.address_counter, 80)
@@ -38,12 +47,12 @@ class DisplayMoveCursorTestCase(unittest.TestCase):
def test_no_change_force(self):
# Arrange
self.display.move_cursor(0, 0)
self.display.move_cursor(index=0)
self.interface.offload_load_address_counter.reset_mock()
# Act
self.display.move_cursor(0, 0, force_load=True)
self.display.move_cursor(index=0, force_load=True)
# Assert
self.assertEqual(self.display.address_counter, 80)
@@ -60,8 +69,18 @@ class DisplayBufferedWriteTestCase(unittest.TestCase):
def test(self):
# Act
self.display.buffered_write(0x01, 0, 15)
self.display.buffered_write(0x02, 1, 17)
self.display.buffered_write(0x01, index=15)
self.display.buffered_write(0x02, index=97)
# Assert
self.assertEqual(self.display.buffer[15], 0x01)
self.assertEqual(self.display.buffer[97], 0x02)
self.assertSequenceEqual(self.display.dirty, [15, 97])
def test_with_row_and_column(self):
# Act
self.display.buffered_write(0x01, row=0, column=15)
self.display.buffered_write(0x02, row=1, column=17)
# Assert
self.assertEqual(self.display.buffer[15], 0x01)
@@ -69,15 +88,15 @@ class DisplayBufferedWriteTestCase(unittest.TestCase):
self.assertSequenceEqual(self.display.dirty, [15, 97])
def test_change(self):
self.assertTrue(self.display.buffered_write(0x01, 0, 0))
self.assertTrue(self.display.buffered_write(0x02, 0, 0))
self.assertTrue(self.display.buffered_write(0x01, index=0))
self.assertTrue(self.display.buffered_write(0x02, index=0))
self.assertEqual(self.display.buffer[0], 0x02)
self.assertSequenceEqual(self.display.dirty, [0])
def test_no_change(self):
self.assertTrue(self.display.buffered_write(0x01, 0, 0))
self.assertFalse(self.display.buffered_write(0x01, 0, 0))
self.assertTrue(self.display.buffered_write(0x01, index=0))
self.assertFalse(self.display.buffered_write(0x01, index=0))
self.assertEqual(self.display.buffer[0], 0x01)
self.assertSequenceEqual(self.display.dirty, [0])
@@ -101,9 +120,9 @@ class DisplayFlushTestCase(unittest.TestCase):
def test_single_range(self):
# Arrange
self.display.buffered_write(0x01, 0, 0)
self.display.buffered_write(0x02, 0, 1)
self.display.buffered_write(0x03, 0, 2)
self.display.buffered_write(0x01, index=0)
self.display.buffered_write(0x02, index=1)
self.display.buffered_write(0x03, index=2)
# Act
self.display.flush()
@@ -113,12 +132,12 @@ class DisplayFlushTestCase(unittest.TestCase):
def test_multiple_ranges(self):
# Arrange
self.display.buffered_write(0x01, 0, 0)
self.display.buffered_write(0x02, 0, 1)
self.display.buffered_write(0x03, 0, 2)
self.display.buffered_write(0x05, 0, 30)
self.display.buffered_write(0x06, 0, 31)
self.display.buffered_write(0x04, 0, 20)
self.display.buffered_write(0x01, index=0)
self.display.buffered_write(0x02, index=1)
self.display.buffered_write(0x03, index=2)
self.display.buffered_write(0x05, index=30)
self.display.buffered_write(0x06, index=31)
self.display.buffered_write(0x04, index=20)
# Act
self.display.flush()
@@ -136,7 +155,7 @@ class DisplayClearTestCase(unittest.TestCase):
def test_excluding_status_line(self):
# Arrange
self.display.buffered_write(0x01, 0, 0)
self.display.buffered_write(0x01, index=0)
self.assertEqual(self.display.buffer[0], 0x01)
self.assertTrue(self.display.dirty)
@@ -153,7 +172,7 @@ class DisplayClearTestCase(unittest.TestCase):
def test_including_status_line(self):
# Arrange
self.display.buffered_write(0x01, 0, 0)
self.display.buffered_write(0x01, index=0)
self.assertEqual(self.display.buffer[0], 0x01)
self.assertTrue(self.display.dirty)
@@ -178,11 +197,11 @@ class DisplayFlushRangeTestCase(unittest.TestCase):
def test_when_start_address_is_current_address_counter(self):
# Arrange
self.display.move_cursor(0, 0)
self.display.move_cursor(index=0)
self.display.buffered_write(0x01, 0, 0)
self.display.buffered_write(0x02, 0, 1)
self.display.buffered_write(0x03, 0, 2)
self.display.buffered_write(0x01, index=0)
self.display.buffered_write(0x02, index=1)
self.display.buffered_write(0x03, index=2)
# Act
self.display.flush()
@@ -195,11 +214,11 @@ class DisplayFlushRangeTestCase(unittest.TestCase):
def test_when_start_address_is_not_current_address_counter(self):
# Arrange
self.display.move_cursor(0, 70)
self.display.move_cursor(index=70)
self.display.buffered_write(0x01, 0, 0)
self.display.buffered_write(0x02, 0, 1)
self.display.buffered_write(0x03, 0, 2)
self.display.buffered_write(0x01, index=0)
self.display.buffered_write(0x02, index=1)
self.display.buffered_write(0x03, index=2)
# Act
self.display.flush()
@@ -220,6 +239,16 @@ class EncodeAsciiCharacterTestCase(unittest.TestCase):
def test_out_of_range(self):
self.assertEqual(encode_ascii_character(ord('')), 0x00)
class EncodeEbcdicCharacterTestCase(unittest.TestCase):
def test_mapped_character(self):
self.assertEqual(encode_ebcdic_character(129), 0x80)
def test_unmapped_character(self):
self.assertEqual(encode_ebcdic_character(185), 0x00)
def test_out_of_range(self):
self.assertEqual(encode_ebcdic_character(256), 0x00)
class EncodeStringTestCase(unittest.TestCase):
def test_mapped_characters(self):
self.assertEqual(encode_string('Hello, world!'), bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'))

View File

@@ -2,7 +2,7 @@ import unittest
import context
from oec.keyboard import KeyboardModifiers, Key, Keymap, Keyboard, get_ascii_character_for_key
from oec.keyboard import KeyboardModifiers, Key, Keymap, Keyboard, get_ascii_character_for_key, get_ebcdic_character_for_key
from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2
from oec.keymap_3483 import KEYMAP as KEYMAP_3483
@@ -209,3 +209,13 @@ class GetAsciiCharacterForKeyTestCase(unittest.TestCase):
def test_mapping(self):
self.assertEqual(get_ascii_character_for_key(Key.UPPER_A), 'A')
class GetEbcdicCharacterForKeyTestCase(unittest.TestCase):
def test_none(self):
self.assertIsNone(get_ebcdic_character_for_key(None))
def test_no_mapping(self):
self.assertIsNone(get_ebcdic_character_for_key(Key.ATTN))
def test_mapping(self):
self.assertEqual(get_ebcdic_character_for_key(Key.UPPER_A), 193)

228
tests/test_tn3270.py Normal file
View File

@@ -0,0 +1,228 @@
import unittest
from unittest.mock import Mock
import context
from oec.session import SessionDisconnectedError
from oec.keyboard import Key, KeyboardModifiers
from oec.tn3270 import TN3270Session
from tn3270 import AttributeCell, CharacterCell, AID, ProtectedCellOperatorError
class SessionHandleHostTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.terminal.display = MockDisplay(24, 80)
self.session = TN3270Session(self.terminal, 'mainframe', 23)
self.telnet = Mock()
self.session.telnet = self.telnet
self.session.emulator = Mock()
def test_no_changes(self):
# Arrange
self.session.emulator.update = Mock(return_value=False)
# Act and assert
self.assertFalse(self.session.handle_host())
def test_changes(self):
# Arrange
self.session.emulator.update = Mock(return_value=True)
cells = _create_screen_cells(24, 80)
_set_attribute(cells, 0, MockAttribute(protected=True))
_set_characters(cells, 1, 'PROTECTED'.encode('cp500'))
_set_attribute(cells, 10, MockAttribute(protected=True, intensified=True))
_set_characters(cells, 11, 'PROTECTED INTENSIFIED'.encode('cp500'))
_set_attribute(cells, 32, MockAttribute(protected=True, hidden=True))
_set_characters(cells, 33, 'PROTECTED HIDDEN'.encode('cp500'))
_set_attribute(cells, 49, MockAttribute(protected=False))
_set_characters(cells, 50, 'UNPROTECTED'.encode('cp500'))
_set_attribute(cells, 61, MockAttribute(protected=False, intensified=True))
_set_characters(cells, 62, 'UNPROTECTED INTENSIFIED'.encode('cp500'))
_set_attribute(cells, 85, MockAttribute(protected=False, hidden=True))
_set_characters(cells, 86, 'UNPROTECTED HIDDEN'.encode('cp500'))
_set_attribute(cells, 104, MockAttribute(protected=True))
self.session.emulator.cells = cells
self.session.emulator.cursor_address = 8
# Act and assert
self.assertTrue(self.session.handle_host())
self.terminal.display.flush.assert_called()
self.assertEqual(self.terminal.display.buffer[:105], bytes.fromhex('e0afb1aeb3a4a2b3a4a3e8afb1aeb3a4a2b3a4a300a8adb3a4adb2a8a5a8a4a3ccafb1aeb3a4a2b3a4a300a7a8a3a3a4adc0b4adafb1aeb3a4a2b3a4a3c8b4adafb1aeb3a4a2b3a4a300a8adb3a4adb2a8a5a8a4a3ccb4adafb1aeb3a4a2b3a4a300a7a8a3a3a4ade0'))
self.assertTrue(all([byte == 0x00 for byte in self.terminal.display.buffer[105:]]))
self.assertEqual(self.terminal.display.cursor_index, 8)
def test_eof(self):
# Arrange
self.session.emulator.update = Mock(side_effect=EOFError)
# Act and assert
with self.assertRaises(SessionDisconnectedError):
self.session.handle_host()
self.telnet.close.assert_called()
def test_keyboard_locked(self):
# Arrange
self.session.emulator.update = Mock(return_value=True)
self.session.emulator.cells = _create_screen_cells(24, 80)
# Act
self.session.handle_host()
# Assert
self.terminal.display.status_line.write.assert_called_with(8, bytes.fromhex('f600b2b8b2b3a4ac00'))
class SessionHandleKeyTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.session = TN3270Session(self.terminal, 'mainframe', 23)
self.session.emulator = Mock()
self.session.emulator.cells = []
def test_enter(self):
# Act
self.session.handle_key(Key.ENTER, KeyboardModifiers.NONE, None)
# Assert
self.session.emulator.aid.assert_called_with(AID.ENTER)
def test_backspace(self):
# Act
self.session.handle_key(Key.BACKSPACE, KeyboardModifiers.NONE, None)
# Assert
self.session.emulator.backspace.assert_called()
def test_tab(self):
# Act
self.session.handle_key(Key.TAB, KeyboardModifiers.NONE, None)
# Assert
self.session.emulator.tab.assert_called()
def test_backtab(self):
# Act
self.session.handle_key(Key.BACKTAB, KeyboardModifiers.NONE, None)
# Assert
self.session.emulator.tab.assert_called_with(direction=-1)
def test_newline(self):
# Act
self.session.handle_key(Key.NEWLINE, KeyboardModifiers.NONE, None)
# Assert
self.session.emulator.newline.assert_called()
def test_field_exit(self):
# Act
self.session.handle_key(Key.FIELD_EXIT, KeyboardModifiers.NONE, None)
# Assert
self.session.emulator.newline.assert_called()
def test_home(self):
# Act
self.session.handle_key(Key.HOME, KeyboardModifiers.NONE, None)
# Assert
self.session.emulator.home.assert_called()
def test_up(self):
# Act
self.session.handle_key(Key.UP, KeyboardModifiers.NONE, None)
# Assert
self.session.emulator.cursor_up.assert_called()
def test_down(self):
# Act
self.session.handle_key(Key.DOWN, KeyboardModifiers.NONE, None)
# Assert
self.session.emulator.cursor_down.assert_called()
def test_left(self):
# Act
self.session.handle_key(Key.LEFT, KeyboardModifiers.NONE, None)
# Assert
self.session.emulator.cursor_left.assert_called()
def test_right(self):
# Act
self.session.handle_key(Key.RIGHT, KeyboardModifiers.NONE, None)
# Assert
self.session.emulator.cursor_right.assert_called()
def test_delete(self):
# Act
self.session.handle_key(Key.DELETE, KeyboardModifiers.NONE, None)
# Assert
self.session.emulator.delete.assert_called()
def test_input(self):
# Act
self.session.handle_key(Key.LOWER_A, KeyboardModifiers.NONE, None)
# Assert
self.session.emulator.input.assert_called_with(0x81)
def test_protected_cell_operator_error(self):
# Arrange
self.session.emulator.input = Mock(side_effect=ProtectedCellOperatorError)
# Act
self.session.handle_key(Key.LOWER_A, KeyboardModifiers.NONE, None)
# Assert
self.terminal.display.status_line.write.assert_called_with(8, bytes.fromhex('f600f8dbd800000000'))
class MockDisplay:
def __init__(self, rows, columns):
self.buffer = bytearray(rows * columns)
self.cursor_index = None
self.status_line = Mock()
self.flush = Mock()
def buffered_write(self, byte, index):
self.buffer[index] = byte
def move_cursor(self, index):
self.cursor_index = index
class MockAttribute:
def __init__(self, protected=False, intensified=False, hidden=False):
self.protected = protected
self.intensified = intensified
self.hidden = hidden
def _create_screen_cells(rows, columns):
return [CharacterCell(0x00) for address in range(rows * columns)]
def _set_attribute(screen, index, attribute):
screen[index] = AttributeCell(attribute)
def _set_characters(screen, index, bytes_):
for byte in bytes_:
screen[index] = CharacterCell(byte)
index += 1

View File

@@ -27,13 +27,13 @@ class SessionHandleHostTestCase(unittest.TestCase):
session.handle_host()
# Assert
terminal.display.buffered_write.assert_any_call(0x80, 0, 0)
terminal.display.buffered_write.assert_any_call(0x81, 0, 1)
terminal.display.buffered_write.assert_any_call(0x82, 0, 2)
terminal.display.buffered_write.assert_any_call(0x80, row=0, column=0)
terminal.display.buffered_write.assert_any_call(0x81, row=0, column=1)
terminal.display.buffered_write.assert_any_call(0x82, row=0, column=2)
terminal.display.flush.assert_called()
terminal.display.move_cursor.assert_called_with(0, 3)
terminal.display.move_cursor.assert_called_with(row=0, column=3)
class SessionHandleKeyTestCase(unittest.TestCase):
def setUp(self):