Add EAB support to TN3270

This commit is contained in:
Andrew Kay 2021-05-15 13:27:52 -05:00
parent 7f9f011172
commit 7b836e0220
11 changed files with 525 additions and 161 deletions

View File

@ -18,8 +18,7 @@ expect from a later model 3174, but it does provide basic TN3270 and VT100
emulation.
- [x] TN3270
- [x] Basic TN3270
- [ ] Extended Data Stream
- [x] Extended Data Stream
- [ ] TN3270E
- [ ] SSL/TLS
- [x] VT100

View File

@ -52,7 +52,7 @@ def _create_interface(args):
def _create_session(args, terminal):
if args.emulator == 'tn3270':
return TN3270Session(terminal, args.host, args.port, args.extended_data_stream)
return TN3270Session(terminal, args.host, args.port)
if args.emulator == 'vt100' and is_vt100_available:
host_command = [args.command, *args.command_args]
@ -91,10 +91,6 @@ def main():
tn3270_parser.add_argument('host', help='Hostname')
tn3270_parser.add_argument('port', nargs='?', default=23, type=int)
tn3270_parser.add_argument('--disable-eds', action='store_false',
dest='extended_data_stream',
help='Disable extended data stream support')
if is_vt100_available:
vt100_parser = subparsers.add_parser('vt100', description='VT100 emulator',
help='VT100 emulator')

View File

@ -3,11 +3,13 @@ oec.controller
~~~~~~~~~~~~~~
"""
import os
import time
import logging
import selectors
from textwrap import dedent
from coax import poll, poll_ack, load_control_register, get_features, PollAction, \
KeystrokePollResponse, TerminalType, ReceiveTimeout, \
KeystrokePollResponse, TerminalType, Feature, ReceiveTimeout, \
ReceiveError, ProtocolError
from .terminal import Terminal, UnsupportedTerminalError, read_terminal_ids
@ -104,6 +106,8 @@ class Controller:
def _handle_terminal_attached(self, poll_response):
self.logger.info('Terminal attached')
jumbo_write_strategy = _get_jumbo_write_strategy()
# Read the terminal identifiers.
(terminal_id, extended_id) = read_terminal_ids(self.interface)
@ -117,18 +121,28 @@ class Controller:
self.logger.info(f'Features = {features}')
if Feature.EAB in features:
if self.interface.legacy_firmware_detected and jumbo_write_strategy is None:
del features[Feature.EAB]
_print_no_i1_eab_notice()
# Get the keymap.
keymap = self.get_keymap(terminal_id, extended_id)
# Initialize the terminal.
self.terminal = Terminal(self.interface, terminal_id, extended_id,
features, keymap)
features, keymap,
jumbo_write_strategy=jumbo_write_strategy)
(rows, columns) = self.terminal.display.dimensions
keymap_name = self.terminal.keyboard.keymap.name
self.logger.info(f'Rows = {rows}, Columns = {columns}, Keymap = {keymap_name}')
if self.terminal.display.has_eab:
self.terminal.display.load_eab_mask(0xff)
self.terminal.display.clear(clear_status_line=True)
# Show the attached indicator on the status line.
@ -253,3 +267,45 @@ class Controller:
def _load_control_register(self):
load_control_register(self.interface, self.terminal.get_control_register())
def _get_jumbo_write_strategy():
value = os.environ.get('COAX_JUMBO')
if value is None:
return None
if value in ['split', 'ignore']:
return value
self.logger.warning(f'Unsupported COAX_JUMBO option: {value}')
return None
def _print_no_i1_eab_notice():
notice = '''
**** **** **** **** **** **** **** **** **** **** **** **** **** **** **** ****
Your terminal is reporting the existence of an EAB feature that allows extended
colors and formatting, however...
I think you are using an older firmware on the 1st generation, Arduino Mega
based, interface which does not support the "jumbo write" required to write a
full screen to the regen and EAB buffers.
I'm going to continue as if the EAB feature did not exist...
If you want to override this behavior, you can set the COAX_JUMBO environment
variable as follows:
- COAX_JUMBO=split - split large writes into multiple smaller 32-byte writes
before sending to the interface, this will result in
additional round trips to the interface which may
manifest as visible incremental changes being applied
to the screen
- COAX_JUMBO=ignore - try a jumbo write, anyway, use this option if you
believe you are seeing this behavior in error
**** **** **** **** **** **** **** **** **** **** **** **** **** **** **** ****
'''
print(dedent(notice))

View File

@ -5,9 +5,11 @@ oec.display
from collections import namedtuple
import logging
from more_itertools import interleave
from sortedcontainers import SortedSet
from coax import read_address_counter_hi, read_address_counter_lo, \
load_address_counter_hi, load_address_counter_lo, write_data
load_address_counter_hi, load_address_counter_lo, write_data, \
eab_load_mask, eab_write_alternate
_ASCII_CHAR_MAP = {
'>': 0x08,
@ -168,15 +170,17 @@ def encode_string(string, errors='replace'):
Dimensions = namedtuple('Dimensions', ['rows', 'columns'])
class Display:
def __init__(self, interface, dimensions):
def __init__(self, interface, dimensions, eab_address, jumbo_write_strategy=None):
self.logger = logging.getLogger(__name__)
self.interface = interface
self.dimensions = dimensions
self.eab_address = eab_address
(rows, columns) = self.dimensions
self.buffer = bytearray(rows * columns)
self.regen_buffer = bytearray(rows * columns)
self.eab_buffer = bytearray(rows * columns)
self.dirty = SortedSet()
self.address_counter = None
@ -186,6 +190,12 @@ class Display:
self.cursor_reverse = False
self.cursor_blink = False
self.jumbo_write_strategy = jumbo_write_strategy
@property
def has_eab(self):
return self.eab_address is not None
def move_cursor(self, index=None, row=None, column=None, force_load=False):
"""Load the address counter."""
address = self._calculate_address(index=index, row=row, column=column)
@ -194,7 +204,14 @@ class Display:
return self._load_address_counter(address, force_load)
def buffered_write(self, byte, index=None, row=None, column=None):
def load_eab_mask(self, mask):
"""Load the EAB mask."""
if not self.has_eab:
raise RuntimeError('No EAB feature')
eab_load_mask(self.interface, self.eab_address, mask)
def buffered_write(self, regen_byte, eab_byte, index=None, row=None, column=None):
if index is None:
if row is None or column is None:
raise ValueError('Either index or row and column is required')
@ -203,10 +220,11 @@ class Display:
# TODO: Verify that index is within range.
if self.buffer[index] == byte:
if self.regen_buffer[index] == regen_byte and (self.eab_buffer[index] == eab_byte or not self.has_eab):
return False
self.buffer[index] = byte
self.regen_buffer[index] = regen_byte
self.eab_buffer[index] = eab_byte if self.has_eab else 0x00
self.dirty.add(index)
@ -227,11 +245,12 @@ class Display:
address = columns
count = rows * columns
self._write((b'\x00', count), address=address)
self._write((b'\x00', count), (b'\x00', count) if self.has_eab else None, address=address)
# Update the buffer and dirty indicators to reflect the cleared screen.
# Update the buffers and dirty indicators to reflect the cleared screen.
for index in range(rows * columns):
self.buffer[index] = 0x00
self.regen_buffer[index] = 0x00
self.eab_buffer[index] = 0x00
self.dirty.clear()
@ -303,12 +322,13 @@ class Display:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f'Flushing changes for range {start_index}-{end_index}')
data = self.buffer[start_index:end_index+1]
regen_data = self.regen_buffer[start_index:end_index+1]
eab_data = self.eab_buffer[start_index:end_index+1] if self.has_eab else None
address = self._calculate_address(start_index)
try:
self._write(data, address=address)
self._write(regen_data, eab_data, address=address)
except Exception as error:
# TODO: This could leave the address_counter incorrect.
self.logger.error(f'Write error: {error}', exc_info=error)
@ -318,7 +338,23 @@ class Display:
return self.address_counter
def _write(self, data, address=None, restore_original_address=False):
def _write(self, regen_data, eab_data, address=None, restore_original_address=False):
if eab_data is not None:
if not self.has_eab:
raise RuntimeError('No EAB feature')
if isinstance(regen_data, tuple) and isinstance(eab_data, tuple):
if len(regen_data[0]) != len(eab_data[0]):
raise ValueError('Regen and EAB pattern length must be equal')
if regen_data[1] != eab_data[1]:
raise ValueError('Regen and EAB pattern count must be equal')
elif not isinstance(regen_data, tuple) and not isinstance(eab_data, tuple):
if len(regen_data) != len(eab_data):
raise ValueError('Regen and EAB data length must be equal')
else:
raise ValueError('Regen and EAB data must be provided in same form')
if restore_original_address:
original_address = self.address_counter
@ -328,12 +364,23 @@ class Display:
if address is not None:
self._load_address_counter(address, force_load=False)
write_data(self.interface, data)
if eab_data is not None:
# Validation of regen and EAB data form has been performed.
if isinstance(regen_data, tuple):
data = (bytes(interleave(regen_data[0], eab_data[0])), regen_data[1])
else:
data = bytes(interleave(regen_data, eab_data))
if isinstance(address, tuple):
length = len(data[0]) * data[1]
eab_write_alternate(self.interface, self.eab_address, data,
jumbo_write_strategy=self.jumbo_write_strategy)
else:
length = len(data)
write_data(self.interface, regen_data,
jumbo_write_strategy=self.jumbo_write_strategy)
if isinstance(regen_data, tuple):
length = len(regen_data[0]) * regen_data[1]
else:
length = len(regen_data)
self.address_counter = self._calculate_address_after_write(address, length)
@ -348,7 +395,7 @@ class StatusLine:
self.columns = display.dimensions.columns
def write(self, column, data):
self.display._write(data, address=column, restore_original_address=True)
self.display._write(data, None, address=column, restore_original_address=True)
def write_string(self, column, string):
self.write(column, encode_string(string))

View File

@ -5,8 +5,8 @@ oec.terminal
import time
import logging
from coax import read_terminal_id, read_extended_id, PollAction, Control, ReceiveError, \
ProtocolError
from coax import read_terminal_id, read_extended_id, PollAction, Control, Feature, \
ReceiveError, ProtocolError
from .display import Dimensions, Display
from .keyboard import Keyboard
@ -59,7 +59,8 @@ def read_terminal_ids(interface, extended_id_retry_attempts=3):
class Terminal:
"""Terminal information and devices."""
def __init__(self, interface, terminal_id, extended_id, features, keymap):
def __init__(self, interface, terminal_id, extended_id, features, keymap,
jumbo_write_strategy=None):
self.interface = interface
self.terminal_id = terminal_id
self.extended_id = extended_id
@ -67,7 +68,8 @@ class Terminal:
dimensions = get_dimensions(self.terminal_id, self.extended_id)
self.display = Display(interface, dimensions)
self.display = Display(interface, dimensions, features.get(Feature.EAB),
jumbo_write_strategy=jumbo_write_strategy)
self.keyboard = Keyboard(keymap)
self.alarm = False

View File

@ -4,8 +4,8 @@ oec.tn3270
"""
import logging
from tn3270 import Telnet, Emulator, AttributeCell, CharacterCell, AID, OperatorError, \
ProtectedCellOperatorError, FieldOverflowOperatorError
from tn3270 import Telnet, Emulator, AttributeCell, CharacterCell, AID, Color, Highlight, \
OperatorError, ProtectedCellOperatorError, FieldOverflowOperatorError
from tn3270.ebcdic import DUP, FM
from .session import Session, SessionDisconnectedError
@ -47,13 +47,12 @@ AID_KEY_MAP = {
class TN3270Session(Session):
"""TN3270 session."""
def __init__(self, terminal, host, port, extended_data_stream=True):
def __init__(self, terminal, host, port):
self.logger = logging.getLogger(__name__)
self.terminal = terminal
self.host = host
self.port = port
self.extended_data_stream = extended_data_stream
self.telnet = None
self.emulator = None
@ -71,7 +70,14 @@ class TN3270Session(Session):
(rows, columns) = self.terminal.display.dimensions
self.emulator = Emulator(self.telnet, rows, columns)
if self.terminal.display.has_eab:
supported_colors = 8
supported_highlights = [Highlight.BLINK, Highlight.REVERSE, Highlight.UNDERSCORE]
else:
supported_colors = 1
supported_highlights = []
self.emulator = Emulator(self.telnet, rows, columns, supported_colors, supported_highlights)
self.emulator.alarm = lambda: self.terminal.sound_alarm()
@ -156,10 +162,17 @@ class TN3270Session(Session):
self.terminal.display.status_line.write_keyboard_insert(self.keyboard_insert)
def _connect_host(self):
terminal_type = f'IBM-3278-{self.terminal.terminal_id.model}'
# We will pretend a 3279 without EAB is a 3278.
if self.terminal.display.has_eab:
type = '3279'
else:
type = '3278'
if self.extended_data_stream:
terminal_type += '-E'
# Although a IBM 3278 does not support the formatting enabled by the extended
# data stream, the capabilities will be reported in the query reply.
terminal_type = f'IBM-{type}-{self.terminal.terminal_id.model}-E'
self.logger.info(f'Terminal Type = {terminal_type}')
self.telnet = Telnet(terminal_type)
@ -171,17 +184,21 @@ class TN3270Session(Session):
self.telnet = None
def _apply(self):
has_eab = self.terminal.display.has_eab
for address in self.emulator.dirty:
cell = self.emulator.cells[address]
byte = 0x00
regen_byte = 0x00
if isinstance(cell, AttributeCell):
byte = self._map_attribute(cell)
regen_byte = self._map_attribute(cell)
elif isinstance(cell, CharacterCell):
byte = self._map_character(cell)
regen_byte = self._map_character(cell)
self.terminal.display.buffered_write(byte, index=address)
eab_byte = self._map_formatting(cell.formatting) if has_eab else None
self.terminal.display.buffered_write(regen_byte, eab_byte, index=address)
self.emulator.dirty.clear()
@ -216,17 +233,44 @@ class TN3270Session(Session):
if byte == FM:
return encode_ascii_character(ord(';'))
# TODO: Temporary workaround to show empty reverse video fields until EAB
# support is added.
if byte == 0x40 and cell.formatting is not None and cell.formatting.reverse:
return encode_ascii_character(ord('#'))
# TODO: Temporary workaround until character set support is added.
if cell.character_set is not None:
return encode_ascii_character(ord('ß'))
return encode_ebcdic_character(byte)
def _map_formatting(self, formatting):
if formatting is None:
return 0x00
byte = 0x00
# Map the 3270 color to EAB color.
if formatting.color == Color.BLUE:
byte |= 0x08
elif formatting.color == Color.RED:
byte |= 0x10
elif formatting.color == Color.PINK:
byte |= 0x18
elif formatting.color == Color.GREEN:
byte |= 0x20
elif formatting.color == Color.TURQUOISE:
byte |= 0x28
elif formatting.color == Color.YELLOW:
byte |= 0x30
elif formatting.color == Color.WHITE:
byte |= 0x38
# Map the 3270 highlight to EAB highlight.
if formatting.blink:
byte |= 0x40
elif formatting.reverse:
byte |= 0x80
elif formatting.underscore:
byte |= 0xc0
return byte
def _format_message_area(self):
message_area = b''

View File

@ -192,9 +192,9 @@ class VT100Session(Session):
character = row_buffer[column]
# TODO: Investigate multi-byte or zero-byte cases further.
byte = encode_ascii_character(ord(character.data)) if len(character.data) == 1 else 0x00
regen_byte = encode_ascii_character(ord(character.data)) if len(character.data) == 1 else 0x00
self.terminal.display.buffered_write(byte, row=row, column=column)
self.terminal.display.buffered_write(regen_byte, 0x00, row=row, column=column)
self.vt100_screen.dirty.clear()

View File

@ -1,8 +1,9 @@
more-itertools==8.7.0
ptyprocess==0.7.0
pycoax==0.7.0
pycoax==0.8.0
pyserial==3.5
pyte==0.8.0
pytn3270==0.9.1
pytn3270==0.10.0
sliplib==0.6.2
sortedcontainers==2.3.0
wcwidth==0.2.5

View File

@ -11,7 +11,7 @@ class DisplayMoveCursorTestCase(unittest.TestCase):
dimensions = Dimensions(24, 80)
self.display = Display(self.interface, dimensions)
self.display = Display(self.interface, dimensions, None)
self.display._load_address_counter = Mock(wraps=self.display._load_address_counter)
@ -58,40 +58,57 @@ class DisplayBufferedWriteTestCase(unittest.TestCase):
dimensions = Dimensions(24, 80)
self.display = Display(self.interface, dimensions)
self.display = Display(self.interface, Dimensions(24, 80), None)
def test(self):
def test_with_no_eab(self):
# Act
self.display.buffered_write(0x01, index=15)
self.display.buffered_write(0x02, index=97)
self.display.buffered_write(0x01, 0x00, index=15)
self.display.buffered_write(0x02, 0x00, index=97)
self.display.buffered_write(0x00, 0x01, index=98)
# Assert
self.assertEqual(self.display.buffer[15], 0x01)
self.assertEqual(self.display.buffer[97], 0x02)
self.assertEqual(self.display.regen_buffer[15], 0x01)
self.assertEqual(self.display.regen_buffer[97], 0x02)
self.assertEqual(self.display.eab_buffer[98], 0x00)
self.assertSequenceEqual(self.display.dirty, [15, 97])
def test_with_eab(self):
# Arrange
self.display.eab_address = 7
# Act
self.display.buffered_write(0x01, 0x00, index=15)
self.display.buffered_write(0x02, 0x00, index=97)
self.display.buffered_write(0x00, 0x01, index=98)
# Assert
self.assertEqual(self.display.regen_buffer[15], 0x01)
self.assertEqual(self.display.regen_buffer[97], 0x02)
self.assertEqual(self.display.eab_buffer[98], 0x01)
self.assertSequenceEqual(self.display.dirty, [15, 97, 98])
def test_with_row_and_column(self):
# Act
self.display.buffered_write(0x01, row=0, column=15)
self.display.buffered_write(0x02, row=1, column=17)
self.display.buffered_write(0x01, 0x00, row=0, column=15)
self.display.buffered_write(0x02, 0x00, row=1, column=17)
# Assert
self.assertEqual(self.display.buffer[15], 0x01)
self.assertEqual(self.display.buffer[97], 0x02)
self.assertEqual(self.display.regen_buffer[15], 0x01)
self.assertEqual(self.display.regen_buffer[97], 0x02)
self.assertSequenceEqual(self.display.dirty, [15, 97])
def test_change(self):
self.assertTrue(self.display.buffered_write(0x01, index=0))
self.assertTrue(self.display.buffered_write(0x02, index=0))
self.assertTrue(self.display.buffered_write(0x01, 0x00, index=0))
self.assertTrue(self.display.buffered_write(0x02, 0x00, index=0))
self.assertEqual(self.display.buffer[0], 0x02)
self.assertEqual(self.display.regen_buffer[0], 0x02)
self.assertSequenceEqual(self.display.dirty, [0])
def test_no_change(self):
self.assertTrue(self.display.buffered_write(0x01, index=0))
self.assertFalse(self.display.buffered_write(0x01, index=0))
self.assertTrue(self.display.buffered_write(0x01, 0x00, index=0))
self.assertFalse(self.display.buffered_write(0x01, 0x00, index=0))
self.assertEqual(self.display.buffer[0], 0x01)
self.assertEqual(self.display.regen_buffer[0], 0x01)
self.assertSequenceEqual(self.display.dirty, [0])
class DisplayFlushTestCase(unittest.TestCase):
@ -100,7 +117,7 @@ class DisplayFlushTestCase(unittest.TestCase):
dimensions = Dimensions(24, 80)
self.display = Display(self.interface, dimensions)
self.display = Display(self.interface, dimensions, None)
self.display._flush_range = Mock()
@ -111,11 +128,12 @@ class DisplayFlushTestCase(unittest.TestCase):
# Assert
self.display._flush_range.assert_not_called()
def test_single_range(self):
def test_single_range_with_no_eab(self):
# Arrange
self.display.buffered_write(0x01, index=0)
self.display.buffered_write(0x02, index=1)
self.display.buffered_write(0x03, index=2)
self.display.buffered_write(0x01, 0x00, index=0)
self.display.buffered_write(0x02, 0x00, index=1)
self.display.buffered_write(0x03, 0x00, index=2)
self.display.buffered_write(0x00, 0x01, index=3)
# Act
self.display.flush()
@ -123,14 +141,32 @@ class DisplayFlushTestCase(unittest.TestCase):
# Assert
self.display._flush_range.assert_called_with(0, 2)
def test_multiple_ranges(self):
def test_single_range_with_eab(self):
# Arrange
self.display.buffered_write(0x01, index=0)
self.display.buffered_write(0x02, index=1)
self.display.buffered_write(0x03, index=2)
self.display.buffered_write(0x05, index=30)
self.display.buffered_write(0x06, index=31)
self.display.buffered_write(0x04, index=20)
self.display.eab_address = 7
self.display.buffered_write(0x01, 0x00, index=0)
self.display.buffered_write(0x02, 0x00, index=1)
self.display.buffered_write(0x03, 0x00, index=2)
self.display.buffered_write(0x00, 0x01, index=3)
# Act
self.display.flush()
# Assert
self.display._flush_range.assert_called_with(0, 3)
def test_multiple_ranges_with_no_eab(self):
# Arrange
self.display.buffered_write(0x01, 0x00, index=0)
self.display.buffered_write(0x02, 0x00, index=1)
self.display.buffered_write(0x03, 0x01, index=2)
self.display.buffered_write(0x00, 0x02, index=3)
self.display.buffered_write(0x05, 0x00, index=30)
self.display.buffered_write(0x06, 0x00, index=31)
self.display.buffered_write(0x00, 0x05, index=32)
self.display.buffered_write(0x04, 0x03, index=20)
self.display.buffered_write(0x00, 0x04, index=21)
# Act
self.display.flush()
@ -138,13 +174,33 @@ class DisplayFlushTestCase(unittest.TestCase):
# Assert
self.display._flush_range.assert_called_with(0, 31)
def test_multiple_ranges_with_no_eab(self):
# Arrange
self.display.eab_address = 7
self.display.buffered_write(0x01, 0x00, index=0)
self.display.buffered_write(0x02, 0x00, index=1)
self.display.buffered_write(0x03, 0x01, index=2)
self.display.buffered_write(0x00, 0x02, index=3)
self.display.buffered_write(0x05, 0x00, index=30)
self.display.buffered_write(0x06, 0x00, index=31)
self.display.buffered_write(0x00, 0x05, index=32)
self.display.buffered_write(0x04, 0x03, index=20)
self.display.buffered_write(0x00, 0x04, index=21)
# Act
self.display.flush()
# Assert
self.display._flush_range.assert_called_with(0, 32)
class DisplayClearTestCase(unittest.TestCase):
def setUp(self):
self.interface = Mock()
dimensions = Dimensions(24, 80)
self.display = Display(self.interface, dimensions)
self.display = Display(self.interface, dimensions, None)
self.display._load_address_counter = Mock(wraps=self.display._load_address_counter)
self.display._write = Mock(wraps=self.display._write)
@ -161,40 +217,90 @@ class DisplayClearTestCase(unittest.TestCase):
self.write_data_mock = patcher.start()
patcher = patch('oec.display.eab_write_alternate')
self.eab_write_alternate_mock = patcher.start()
self.addCleanup(patch.stopall)
def test_excluding_status_line(self):
def test_excluding_status_line_with_no_eab(self):
# Arrange
self.display.buffered_write(0x01, index=0)
self.display.buffered_write(0x01, 0x01, index=0)
self.assertEqual(self.display.buffer[0], 0x01)
self.assertEqual(self.display.regen_buffer[0], 0x01)
self.assertEqual(self.display.eab_buffer[0], 0x00)
self.assertTrue(self.display.dirty)
# Act
self.display.clear(clear_status_line=False)
# Assert
self.display._write.assert_called_with((b'\x00', 1920), address=80)
self.display._write.assert_called_with((b'\x00', 1920), None, address=80)
self.display._load_address_counter.assert_called_with(80, True)
self.assertEqual(self.display.buffer[0], 0x00)
self.assertEqual(self.display.regen_buffer[0], 0x00)
self.assertEqual(self.display.eab_buffer[0], 0x00)
self.assertFalse(self.display.dirty)
def test_including_status_line(self):
def test_excluding_status_line_with_eab(self):
# Arrange
self.display.buffered_write(0x01, index=0)
self.display.eab_address = 7
self.assertEqual(self.display.buffer[0], 0x01)
self.display.buffered_write(0x01, 0x01, index=0)
self.assertEqual(self.display.regen_buffer[0], 0x01)
self.assertEqual(self.display.eab_buffer[0], 0x01)
self.assertTrue(self.display.dirty)
# Act
self.display.clear(clear_status_line=False)
# Assert
self.display._write.assert_called_with((b'\x00', 1920), (b'\x00', 1920), address=80)
self.display._load_address_counter.assert_called_with(80, True)
self.assertEqual(self.display.regen_buffer[0], 0x00)
self.assertEqual(self.display.eab_buffer[0], 0x00)
self.assertFalse(self.display.dirty)
def test_including_status_line_with_no_eab(self):
# Arrange
self.display.buffered_write(0x01, 0x01, index=0)
self.assertEqual(self.display.regen_buffer[0], 0x01)
self.assertEqual(self.display.eab_buffer[0], 0x00)
self.assertTrue(self.display.dirty)
# Act
self.display.clear(clear_status_line=True)
# Assert
self.display._write.assert_called_with((b'\x00', 2000), address=0)
self.display._write.assert_called_with((b'\x00', 2000), None, address=0)
self.display._load_address_counter.assert_called_with(80, True)
self.assertEqual(self.display.buffer[0], 0x00)
self.assertEqual(self.display.regen_buffer[0], 0x00)
self.assertEqual(self.display.eab_buffer[0], 0x00)
self.assertFalse(self.display.dirty)
def test_including_status_line_with_eab(self):
# Arrange
self.display.eab_address = 7
self.display.buffered_write(0x01, 0x01, index=0)
self.assertEqual(self.display.regen_buffer[0], 0x01)
self.assertEqual(self.display.eab_buffer[0], 0x01)
self.assertTrue(self.display.dirty)
# Act
self.display.clear(clear_status_line=True)
# Assert
self.display._write.assert_called_with((b'\x00', 2000), (b'\x00', 2000), address=0)
self.display._load_address_counter.assert_called_with(80, True)
self.assertEqual(self.display.regen_buffer[0], 0x00)
self.assertEqual(self.display.eab_buffer[0], 0x00)
self.assertFalse(self.display.dirty)
class DisplayFlushRangeTestCase(unittest.TestCase):
@ -203,7 +309,7 @@ class DisplayFlushRangeTestCase(unittest.TestCase):
dimensions = Dimensions(24, 80)
self.display = Display(self.interface, dimensions)
self.display = Display(self.interface, dimensions, None)
self.display._write = Mock(wraps=self.display._write)
@ -225,15 +331,15 @@ class DisplayFlushRangeTestCase(unittest.TestCase):
# Arrange
self.display.move_cursor(index=0)
self.display.buffered_write(0x01, index=0)
self.display.buffered_write(0x02, index=1)
self.display.buffered_write(0x03, index=2)
self.display.buffered_write(0x01, 0x00, index=0)
self.display.buffered_write(0x02, 0x00, index=1)
self.display.buffered_write(0x03, 0x00, index=2)
# Act
self.display.flush()
# Assert
self.display._write.assert_called_with(bytes.fromhex('01 02 03'), address=80)
self.display._write.assert_called_with(bytes.fromhex('01 02 03'), None, address=80)
self.assertEqual(self.display.address_counter, 83)
self.assertFalse(self.display.dirty)
@ -242,15 +348,15 @@ class DisplayFlushRangeTestCase(unittest.TestCase):
# Arrange
self.display.move_cursor(index=70)
self.display.buffered_write(0x01, index=0)
self.display.buffered_write(0x02, index=1)
self.display.buffered_write(0x03, index=2)
self.display.buffered_write(0x01, 0x00, index=0)
self.display.buffered_write(0x02, 0x00, index=1)
self.display.buffered_write(0x03, 0x00, index=2)
# Act
self.display.flush()
# Assert
self.display._write.assert_called_with(bytes.fromhex('01 02 03'), address=80)
self.display._write.assert_called_with(bytes.fromhex('01 02 03'), None, address=80)
self.assertEqual(self.display.address_counter, 83)
self.assertFalse(self.display.dirty)
@ -261,7 +367,7 @@ class DisplayLoadAddressCounterTestCase(unittest.TestCase):
dimensions = Dimensions(24, 80)
self.display = Display(self.interface, dimensions)
self.display = Display(self.interface, dimensions, None)
patcher = patch('oec.display.load_address_counter_hi')
@ -369,7 +475,7 @@ class DisplayWriteTestCase(unittest.TestCase):
dimensions = Dimensions(24, 80)
self.display = Display(self.interface, dimensions)
self.display = Display(self.interface, dimensions, None)
self.display._load_address_counter = Mock(wraps=self.display._load_address_counter)
@ -385,32 +491,84 @@ class DisplayWriteTestCase(unittest.TestCase):
self.write_data_mock = patcher.start()
patcher = patch('oec.display.eab_write_alternate')
self.eab_write_alternate_mock = patcher.start()
self.addCleanup(patch.stopall)
def test(self):
def test_with_no_eab_data(self):
# Act
self.display._write(bytes.fromhex('01 02 03'))
self.display._write(bytes.fromhex('01 02 03'), None)
# Assert
self.assertIsNone(self.display.address_counter)
self.write_data_mock.assert_called_with(self.interface, bytes.fromhex('01 02 03'))
self.write_data_mock.assert_called_with(self.interface, bytes.fromhex('01 02 03'), jumbo_write_strategy=None)
def test_with_eab_data(self):
# Arrange
self.display.eab_address = 7
def test_repeat(self):
# Act
self.display._write((bytes.fromhex('01 02 03'), 3))
self.display._write(bytes.fromhex('01 02 03'), bytes.fromhex('04 05 06'))
# Assert
self.assertIsNone(self.display.address_counter)
self.write_data_mock.assert_called_with(self.interface, (bytes.fromhex('01 02 03'), 3))
self.eab_write_alternate_mock.assert_called_with(self.interface, 7, bytes.fromhex('01 04 02 05 03 06'), jumbo_write_strategy=None)
def test_repeat_with_no_eab_data(self):
# Act
self.display._write((bytes.fromhex('01 02 03'), 3), None)
# Assert
self.assertIsNone(self.display.address_counter)
self.write_data_mock.assert_called_with(self.interface, (bytes.fromhex('01 02 03'), 3), jumbo_write_strategy=None)
def test_repeat_with_eab_data(self):
# Arrange
self.display.eab_address = 7
# Act
self.display._write((bytes.fromhex('01 02 03'), 3), (bytes.fromhex('04 05 06'), 3))
# Assert
self.assertIsNone(self.display.address_counter)
self.eab_write_alternate_mock.assert_called_with(self.interface, 7, (bytes.fromhex('01 04 02 05 03 06'), 3), jumbo_write_strategy=None)
def test_regen_eab_data_mismatch_format(self):
# Arrange
self.display.eab_address = 7
# Act and assert
with self.assertRaisesRegex(ValueError, 'must be provided in same form'):
self.display._write(bytes.fromhex('01 02 03'), (b'\x00', 3))
def test_regen_eab_data_mismatch_length(self):
# Arrange
self.display.eab_address = 7
# Act and assert
with self.assertRaisesRegex(ValueError, 'data length must be equal'):
self.display._write(bytes.fromhex('01 02 03'), bytes.fromhex('01 02'))
def test_regen_eab_data_mismatch_length_repeat(self):
# Arrange
self.display.eab_address = 7
# Act and assert
with self.assertRaisesRegex(ValueError, 'pattern length must be equal'):
self.display._write((bytes.fromhex('01 02 03'), 3), (b'\x00', 2))
def test_address_if_current_address_unknown(self):
# Arrange
self.assertIsNone(self.display.address_counter)
# Act
self.display._write(bytes.fromhex('01 02 03'), address=80)
self.display._write(bytes.fromhex('01 02 03'), None, address=80)
# Assert
self.assertEqual(self.display.address_counter, 83)
@ -420,7 +578,7 @@ class DisplayWriteTestCase(unittest.TestCase):
self.display.address_counter = 160
# Act
self.display._write(bytes.fromhex('01 02 03'), address=80)
self.display._write(bytes.fromhex('01 02 03'), None, address=80)
# Assert
self.assertEqual(self.display.address_counter, 83)
@ -432,7 +590,7 @@ class DisplayWriteTestCase(unittest.TestCase):
self.display.address_counter = 80
# Act
self.display._write(bytes.fromhex('01 02 03'), address=80)
self.display._write(bytes.fromhex('01 02 03'), None, address=80)
# Assert
self.assertEqual(self.display.address_counter, 83)
@ -446,7 +604,7 @@ class DisplayWriteTestCase(unittest.TestCase):
self.assertIsNone(self.display.address_counter)
# Act
self.display._write(bytes.fromhex('01 02 03'), restore_original_address=True)
self.display._write(bytes.fromhex('01 02 03'), None, restore_original_address=True)
# Assert
self.assertEqual(self.display.address_counter, 160)
@ -458,7 +616,7 @@ class DisplayWriteTestCase(unittest.TestCase):
self.display.address_counter = 160
# Act
self.display._write(bytes.fromhex('01 02 03'), restore_original_address=True)
self.display._write(bytes.fromhex('01 02 03'), None, restore_original_address=True)
# Assert
self.assertEqual(self.display.address_counter, 160)

View File

@ -4,15 +4,26 @@ from unittest.mock import Mock
import context
from oec.session import SessionDisconnectedError
from oec.display import Dimensions, Display
from oec.keyboard import Key, KeyboardModifiers
from oec.tn3270 import TN3270Session
from tn3270 import AttributeCell, CharacterCell, AID, ProtectedCellOperatorError, FieldOverflowOperatorError
from tn3270 import AttributeCell, CharacterCell, AID, Color, ProtectedCellOperatorError, FieldOverflowOperatorError
from tn3270.attributes import Attribute
from tn3270.emulator import CellFormatting
class SessionHandleHostTestCase(unittest.TestCase):
def setUp(self):
self.interface = Mock()
self.terminal = Mock()
self.terminal.display = MockDisplay(24, 80)
self.terminal.display = Display(self.interface, Dimensions(24, 80), None)
self.terminal.display.status_line = Mock()
self.terminal.display.move_cursor = Mock()
self.terminal.display.flush = Mock()
self.terminal.display._load_address_counter = Mock()
self.terminal.display._write = Mock()
self.session = TN3270Session(self.terminal, 'mainframe', 23)
@ -28,28 +39,36 @@ class SessionHandleHostTestCase(unittest.TestCase):
# Act and assert
self.assertFalse(self.session.handle_host())
def test_changes(self):
def test_changes_with_no_eab(self):
# Arrange
self.terminal.display.eab_address = None
self.session.emulator.update = Mock(return_value=True)
cells = _create_screen_cells(24, 80)
_set_attribute(cells, 0, MockAttribute(protected=True))
_set_attribute(cells, 0, protected=True)
_set_characters(cells, 1, 'PROTECTED'.encode('cp500'))
_set_attribute(cells, 10, MockAttribute(protected=True, intensified=True))
_set_attribute(cells, 10, protected=True, intensified=True)
_set_characters(cells, 11, 'PROTECTED INTENSIFIED'.encode('cp500'))
_set_attribute(cells, 32, MockAttribute(protected=True, hidden=True))
_set_attribute(cells, 32, protected=True, hidden=True)
_set_characters(cells, 33, 'PROTECTED HIDDEN'.encode('cp500'))
_set_attribute(cells, 49, MockAttribute(protected=False))
_set_attribute(cells, 49, protected=False)
_set_characters(cells, 50, 'UNPROTECTED'.encode('cp500'))
_set_attribute(cells, 61, MockAttribute(protected=False, intensified=True))
_set_attribute(cells, 61, protected=False, intensified=True)
_set_characters(cells, 62, 'UNPROTECTED INTENSIFIED'.encode('cp500'))
_set_attribute(cells, 85, MockAttribute(protected=False, hidden=True))
_set_attribute(cells, 85, protected=False, hidden=True)
_set_characters(cells, 86, 'UNPROTECTED HIDDEN'.encode('cp500'))
_set_attribute(cells, 104, MockAttribute(protected=True))
_set_attribute(cells, 104, protected=True)
_set_formatting(cells, 104, color=Color.YELLOW)
_set_characters(cells, 105, 'EAB'.encode('cp500'))
_set_formatting(cells, 105, blink=True)
_set_formatting(cells, 106, reverse=True)
_set_formatting(cells, 107, underscore=True)
_set_attribute(cells, 108, protected=True)
self.session.emulator.cells = cells
self.session.emulator.dirty = set(range(105))
self.session.emulator.dirty = set(range(109))
self.session.emulator.cursor_address = 8
@ -58,10 +77,61 @@ class SessionHandleHostTestCase(unittest.TestCase):
self.terminal.display.flush.assert_called()
self.assertEqual(self.terminal.display.buffer[:105], bytes.fromhex('e0afb1aeb3a4a2b3a4a3e8afb1aeb3a4a2b3a4a300a8adb3a4adb2a8a5a8a4a3ecafb1aeb3a4a2b3a4a300a7a8a3a3a4adc0b4adafb1aeb3a4a2b3a4a3c8b4adafb1aeb3a4a2b3a4a300a8adb3a4adb2a8a5a8a4a3ccb4adafb1aeb3a4a2b3a4a300a7a8a3a3a4ade0'))
self.assertTrue(all([byte == 0x00 for byte in self.terminal.display.buffer[105:]]))
self.assertEqual(self.terminal.display.regen_buffer[:109], bytes.fromhex('e0afb1aeb3a4a2b3a4a3e8afb1aeb3a4a2b3a4a300a8adb3a4adb2a8a5a8a4a3ecafb1aeb3a4a2b3a4a300a7a8a3a3a4adc0b4adafb1aeb3a4a2b3a4a3c8b4adafb1aeb3a4a2b3a4a300a8adb3a4adb2a8a5a8a4a3ccb4adafb1aeb3a4a2b3a4a300a7a8a3a3a4ade0a4a0a1e0'))
self.assertTrue(all([byte == 0x00 for byte in self.terminal.display.regen_buffer[109:]]))
self.assertEqual(self.terminal.display.cursor_index, 8)
self.assertTrue(all([byte == 0x00 for byte in self.terminal.display.eab_buffer]))
self.terminal.display.move_cursor.assert_called_with(index=8)
self.assertFalse(self.session.emulator.dirty)
def test_changes_with_eab(self):
# Arrange
self.terminal.display.eab_address = 7
self.session.emulator.update = Mock(return_value=True)
cells = _create_screen_cells(24, 80)
_set_attribute(cells, 0, protected=True)
_set_characters(cells, 1, 'PROTECTED'.encode('cp500'))
_set_attribute(cells, 10, protected=True, intensified=True)
_set_characters(cells, 11, 'PROTECTED INTENSIFIED'.encode('cp500'))
_set_attribute(cells, 32, protected=True, hidden=True)
_set_characters(cells, 33, 'PROTECTED HIDDEN'.encode('cp500'))
_set_attribute(cells, 49, protected=False)
_set_characters(cells, 50, 'UNPROTECTED'.encode('cp500'))
_set_attribute(cells, 61, protected=False, intensified=True)
_set_characters(cells, 62, 'UNPROTECTED INTENSIFIED'.encode('cp500'))
_set_attribute(cells, 85, protected=False, hidden=True)
_set_characters(cells, 86, 'UNPROTECTED HIDDEN'.encode('cp500'))
_set_attribute(cells, 104, protected=True)
_set_formatting(cells, 104, color=Color.YELLOW)
_set_characters(cells, 105, 'EAB'.encode('cp500'))
_set_formatting(cells, 105, blink=True)
_set_formatting(cells, 106, reverse=True)
_set_formatting(cells, 107, underscore=True)
_set_attribute(cells, 108, protected=True)
self.session.emulator.cells = cells
self.session.emulator.dirty = set(range(109))
self.session.emulator.cursor_address = 8
# Act and assert
self.assertTrue(self.session.handle_host())
self.terminal.display.flush.assert_called()
self.assertEqual(self.terminal.display.regen_buffer[:109], bytes.fromhex('e0afb1aeb3a4a2b3a4a3e8afb1aeb3a4a2b3a4a300a8adb3a4adb2a8a5a8a4a3ecafb1aeb3a4a2b3a4a300a7a8a3a3a4adc0b4adafb1aeb3a4a2b3a4a3c8b4adafb1aeb3a4a2b3a4a300a8adb3a4adb2a8a5a8a4a3ccb4adafb1aeb3a4a2b3a4a300a7a8a3a3a4ade0a4a0a1e0'))
self.assertTrue(all([byte == 0x00 for byte in self.terminal.display.regen_buffer[109:]]))
self.assertTrue(all([byte == 0x00 for byte in self.terminal.display.eab_buffer[:104]]))
self.assertEqual(self.terminal.display.eab_buffer[104:109], bytes.fromhex('304080c000'))
self.assertTrue(all([byte == 0x00 for byte in self.terminal.display.eab_buffer[109:]]))
self.terminal.display.move_cursor.assert_called_with(index=8)
self.assertFalse(self.session.emulator.dirty)
@ -254,41 +324,32 @@ class SessionHandleKeyTestCase(unittest.TestCase):
# Assert
self.terminal.display.status_line.write.assert_called_with(8, bytes.fromhex('f600db080000000000'))
class MockDisplay:
def __init__(self, rows, columns):
self.buffer = bytearray(rows * columns)
self.cursor_index = None
self.status_line = Mock()
self.flush = Mock()
def buffered_write(self, byte, index):
self.buffer[index] = byte
def move_cursor(self, index):
self.cursor_index = index
class MockAttribute:
def __init__(self, protected=False, intensified=False, hidden=False):
self.protected = protected
self.intensified = intensified
self.hidden = hidden
@property
def value(self):
display = 2 if self.intensified else 3 if self.hidden else 0
return (0x20 if self.protected else 0) | (display << 2)
def _create_screen_cells(rows, columns):
return [CharacterCell(0x00) for address in range(rows * columns)]
def _set_attribute(screen, index, attribute):
screen[index] = AttributeCell(attribute)
def _set_attribute(cells, index, protected=False, intensified=False, hidden=False):
display = 2 if intensified else 3 if hidden else 0
def _set_characters(screen, index, bytes_):
attribute = Attribute((0x20 if protected else 0) | (display << 2))
cells[index] = AttributeCell(attribute)
def _set_characters(cells, index, bytes_):
for byte in bytes_:
screen[index] = CharacterCell(byte)
cells[index] = CharacterCell(byte)
index += 1
def _set_formatting(cells, index, color=0x00, blink=False, reverse=False, underscore=False):
if color == 0x00 and not blink and not reverse and not underscore:
cells[index].formatting = None
return
formatting = CellFormatting()
formatting.color = color
formatting.blink = blink
formatting.reverse = reverse
formatting.underscore = underscore
cells[index].formatting = formatting

View File

@ -27,9 +27,9 @@ class SessionHandleHostTestCase(unittest.TestCase):
self.session.handle_host()
# Assert
self.terminal.display.buffered_write.assert_any_call(0x80, row=0, column=0)
self.terminal.display.buffered_write.assert_any_call(0x81, row=0, column=1)
self.terminal.display.buffered_write.assert_any_call(0x82, row=0, column=2)
self.terminal.display.buffered_write.assert_any_call(0x80, 0x00, row=0, column=0)
self.terminal.display.buffered_write.assert_any_call(0x81, 0x00, row=0, column=1)
self.terminal.display.buffered_write.assert_any_call(0x82, 0x00, row=0, column=2)
self.terminal.display.flush.assert_called()