Refactor session

This commit is contained in:
Andrew Kay 2021-06-14 21:08:52 -05:00
parent bdb0866db1
commit f31157cf8b
7 changed files with 355 additions and 197 deletions

View File

@ -146,6 +146,8 @@ class Controller:
self.session = None
def _update_session(self, duration):
update_count = 0
while duration > 0:
start_time = time.perf_counter()
@ -157,10 +159,14 @@ class Controller:
for (key, events) in selected:
session = key.fileobj
session.handle_host()
if session.handle_host():
update_count += 1
duration -= (time.perf_counter() - start_time)
if update_count > 0:
self.session.render()
def _handle_poll_response(self, poll_response):
if isinstance(poll_response, KeystrokePollResponse):
self._handle_keystroke_poll_response(poll_response)
@ -190,6 +196,8 @@ class Controller:
elif self.session:
self.session.handle_key(key, modifiers, scan_code)
self.session.render()
def _poll(self):
self.last_poll_time = time.perf_counter()

View File

@ -14,5 +14,8 @@ class Session:
def handle_key(self, key, keyboard_modifiers, scan_code):
raise NotImplementedError
def render(self):
raise NotImplementedError
class SessionDisconnectedError(Exception):
pass

View File

@ -101,9 +101,6 @@ class TN3270Session(Session):
self.waiting_on_host = False
self._apply()
self._flush()
return True
def handle_key(self, key, keyboard_modifiers, scan_code):
@ -153,6 +150,7 @@ class TN3270Session(Session):
except OperatorError as error:
self.operator_error = error
def render(self):
self._apply()
self._flush()
@ -194,14 +192,7 @@ class TN3270Session(Session):
for address in self.emulator.dirty:
cell = self.emulator.cells[address]
regen_byte = 0x00
if isinstance(cell, AttributeCell):
regen_byte = self._map_attribute(cell)
elif isinstance(cell, CharacterCell):
regen_byte = self._map_character(cell)
eab_byte = self._map_formatting(cell.formatting) if has_eab else None
(regen_byte, eab_byte) = _map_cell(cell, has_eab)
self.terminal.display.buffered_write_byte(regen_byte, eab_byte, index=address)
@ -219,63 +210,11 @@ class TN3270Session(Session):
self.last_message_area = self.message_area
# TODO: see note in VT100 about forcing sync
self.terminal.display.move_cursor(index=self.emulator.cursor_address)
# TODO: eek, is this the correct place to do this?
# TODO: This needs to be moved.
self.operator_error = None
def _map_attribute(self, cell):
# Only map the protected and display bits - ignore numeric, skip and modified.
return 0xc0 | (cell.attribute.value & 0x2c)
def _map_character(self, cell):
byte = cell.byte
if byte == DUP:
return encode_ascii_character(ord('*'))
if byte == FM:
return encode_ascii_character(ord(';'))
# TODO: Temporary workaround until character set support is added.
if cell.character_set is not None:
return encode_ascii_character(ord('ß'))
return encode_ebcdic_character(byte)
def _map_formatting(self, formatting):
if formatting is None:
return 0x00
byte = 0x00
# Map the 3270 color to EAB color.
if formatting.color == Color.BLUE:
byte |= 0x08
elif formatting.color == Color.RED:
byte |= 0x10
elif formatting.color == Color.PINK:
byte |= 0x18
elif formatting.color == Color.GREEN:
byte |= 0x20
elif formatting.color == Color.TURQUOISE:
byte |= 0x28
elif formatting.color == Color.YELLOW:
byte |= 0x30
elif formatting.color == Color.WHITE:
byte |= 0x38
# Map the 3270 highlight to EAB highlight.
if formatting.blink:
byte |= 0x40
elif formatting.reverse:
byte |= 0x80
elif formatting.underscore:
byte |= 0xc0
return byte
def _format_message_area(self):
message_area = b''
@ -293,3 +232,61 @@ class TN3270Session(Session):
message_area = b'\xf6\x00' + encode_string('SYSTEM')
return message_area.ljust(9, b'\x00')
def _map_cell(cell, has_eab):
regen_byte = 0x00
if isinstance(cell, AttributeCell):
# Only map the protected and display bits - ignore numeric, skip and modified.
regen_byte = 0xc0 | (cell.attribute.value & 0x2c)
elif isinstance(cell, CharacterCell):
byte = cell.byte
if cell.character_set is not None:
# TODO: Temporary workaround until character set support is added.
regen_byte = encode_ascii_character(ord('ß'))
elif byte == DUP:
regen_byte = encode_ascii_character(ord('*'))
elif byte == FM:
regen_byte = encode_ascii_character(ord(';'))
else:
regen_byte = encode_ebcdic_character(byte)
if not has_eab:
return (regen_byte, None)
eab_byte = _map_formatting(cell.formatting)
return (regen_byte, eab_byte)
def _map_formatting(formatting):
if formatting is None:
return 0x00
byte = 0x00
# Map the 3270 color to EAB color.
if formatting.color == Color.BLUE:
byte |= 0x08
elif formatting.color == Color.RED:
byte |= 0x10
elif formatting.color == Color.PINK:
byte |= 0x18
elif formatting.color == Color.GREEN:
byte |= 0x20
elif formatting.color == Color.TURQUOISE:
byte |= 0x28
elif formatting.color == Color.YELLOW:
byte |= 0x30
elif formatting.color == Color.WHITE:
byte |= 0x38
# Map the 3270 highlight to EAB highlight.
if formatting.blink:
byte |= 0x40
elif formatting.reverse:
byte |= 0x80
elif formatting.underscore:
byte |= 0xc0
return byte

View File

@ -126,9 +126,6 @@ class VT100Session(Session):
self.vt100_stream.feed(data)
self._apply()
self._flush()
return True
def handle_key(self, key, keyboard_modifiers, scan_code):
@ -139,6 +136,10 @@ class VT100Session(Session):
self.host_process.write(bytes_)
def render(self):
self._apply()
self._flush()
def _map_key(self, key, keyboard_modifiers):
if keyboard_modifiers.is_alt():
# Ignore any modifiers... this would fall through and result in a warning
@ -204,8 +205,6 @@ class VT100Session(Session):
def _flush(self):
self.terminal.display.flush()
# TODO: Investigate different approaches to making cursor syncronization more
# reliable - maybe it needs to be forced sometimes.
cursor = self.vt100_screen.cursor
if cursor.y < self.terminal.display.dimensions.rows and cursor.x < self.terminal.display.dimensions.columns:

View File

@ -224,6 +224,7 @@ class UpdateSessionTestCase(unittest.TestCase):
# Assert
self.controller.session.handle_host.assert_not_called()
self.controller.session.render.assert_not_called()
self.controller.session_selector.select.assert_not_called()
@ -236,6 +237,7 @@ class UpdateSessionTestCase(unittest.TestCase):
# Assert
self.controller.session.handle_host.assert_not_called()
self.controller.session.render.assert_not_called()
self.controller.session_selector.select.assert_called_once()
@ -252,6 +254,7 @@ class UpdateSessionTestCase(unittest.TestCase):
# Assert
self.controller.session.handle_host.assert_called_once()
self.controller.session.render.assert_called_once()
self.assertEqual(self.controller.session_selector.select.call_count, 2)

View File

@ -1,5 +1,5 @@
import unittest
from unittest.mock import Mock
from unittest.mock import Mock, patch
import context
@ -15,14 +15,6 @@ class SessionHandleHostTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.terminal.display = BufferedDisplay(self.terminal, Dimensions(24, 80), None)
self.terminal.display.status_line = Mock()
self.terminal.display.move_cursor = Mock()
self.terminal.display.write = Mock()
self.terminal.display.flush = Mock()
self.terminal.display._load_address_counter = Mock()
self.session = TN3270Session(self.terminal, 'mainframe', 23)
self.telnet = Mock()
@ -37,104 +29,13 @@ class SessionHandleHostTestCase(unittest.TestCase):
# Act and assert
self.assertFalse(self.session.handle_host())
def test_changes_with_no_eab_feature(self):
def test_changes(self):
# Arrange
self.session.emulator.update = Mock(return_value=True)
cells = _create_screen_cells(24, 80)
_set_attribute(cells, 0, protected=True)
_set_characters(cells, 1, 'PROTECTED'.encode('cp500'))
_set_attribute(cells, 10, protected=True, intensified=True)
_set_characters(cells, 11, 'PROTECTED INTENSIFIED'.encode('cp500'))
_set_attribute(cells, 32, protected=True, hidden=True)
_set_characters(cells, 33, 'PROTECTED HIDDEN'.encode('cp500'))
_set_attribute(cells, 49, protected=False)
_set_characters(cells, 50, 'UNPROTECTED'.encode('cp500'))
_set_attribute(cells, 61, protected=False, intensified=True)
_set_characters(cells, 62, 'UNPROTECTED INTENSIFIED'.encode('cp500'))
_set_attribute(cells, 85, protected=False, hidden=True)
_set_characters(cells, 86, 'UNPROTECTED HIDDEN'.encode('cp500'))
_set_attribute(cells, 104, protected=True)
_set_formatting(cells, 104, color=Color.YELLOW)
_set_characters(cells, 105, 'EAB'.encode('cp500'))
_set_formatting(cells, 105, blink=True)
_set_formatting(cells, 106, reverse=True)
_set_formatting(cells, 107, underscore=True)
_set_attribute(cells, 108, protected=True)
self.session.emulator.cells = cells
self.session.emulator.dirty = set(range(109))
self.session.emulator.cursor_address = 8
# Act and assert
self.assertTrue(self.session.handle_host())
self.terminal.display.flush.assert_called()
self.assertEqual(self.terminal.display.regen_buffer[80:189], bytes.fromhex('e0afb1aeb3a4a2b3a4a3e8afb1aeb3a4a2b3a4a300a8adb3a4adb2a8a5a8a4a3ecafb1aeb3a4a2b3a4a300a7a8a3a3a4adc0b4adafb1aeb3a4a2b3a4a3c8b4adafb1aeb3a4a2b3a4a300a8adb3a4adb2a8a5a8a4a3ccb4adafb1aeb3a4a2b3a4a300a7a8a3a3a4ade0a4a0a1e0'))
self.assertTrue(all([byte == 0x00 for byte in self.terminal.display.regen_buffer[189:]]))
self.terminal.display.move_cursor.assert_called_with(index=8)
self.assertFalse(self.session.emulator.dirty)
def test_changes_with_eab_feature(self):
# Arrange
self.terminal.display = BufferedDisplay(self.terminal, Dimensions(24, 80), 7)
self.terminal.display.status_line = Mock()
self.terminal.display.move_cursor = Mock()
self.terminal.display.write = Mock()
self.terminal.display.flush = Mock()
self.terminal.display._load_address_counter = Mock()
self.session.emulator.update = Mock(return_value=True)
cells = _create_screen_cells(24, 80)
_set_attribute(cells, 0, protected=True)
_set_characters(cells, 1, 'PROTECTED'.encode('cp500'))
_set_attribute(cells, 10, protected=True, intensified=True)
_set_characters(cells, 11, 'PROTECTED INTENSIFIED'.encode('cp500'))
_set_attribute(cells, 32, protected=True, hidden=True)
_set_characters(cells, 33, 'PROTECTED HIDDEN'.encode('cp500'))
_set_attribute(cells, 49, protected=False)
_set_characters(cells, 50, 'UNPROTECTED'.encode('cp500'))
_set_attribute(cells, 61, protected=False, intensified=True)
_set_characters(cells, 62, 'UNPROTECTED INTENSIFIED'.encode('cp500'))
_set_attribute(cells, 85, protected=False, hidden=True)
_set_characters(cells, 86, 'UNPROTECTED HIDDEN'.encode('cp500'))
_set_attribute(cells, 104, protected=True)
_set_formatting(cells, 104, color=Color.YELLOW)
_set_characters(cells, 105, 'EAB'.encode('cp500'))
_set_formatting(cells, 105, blink=True)
_set_formatting(cells, 106, reverse=True)
_set_formatting(cells, 107, underscore=True)
_set_attribute(cells, 108, protected=True)
self.session.emulator.cells = cells
self.session.emulator.dirty = set(range(109))
self.session.emulator.cursor_address = 8
# Act and assert
self.assertTrue(self.session.handle_host())
self.terminal.display.flush.assert_called()
self.assertEqual(self.terminal.display.regen_buffer[80:189], bytes.fromhex('e0afb1aeb3a4a2b3a4a3e8afb1aeb3a4a2b3a4a300a8adb3a4adb2a8a5a8a4a3ecafb1aeb3a4a2b3a4a300a7a8a3a3a4adc0b4adafb1aeb3a4a2b3a4a3c8b4adafb1aeb3a4a2b3a4a300a8adb3a4adb2a8a5a8a4a3ccb4adafb1aeb3a4a2b3a4a300a7a8a3a3a4ade0a4a0a1e0'))
self.assertTrue(all([byte == 0x00 for byte in self.terminal.display.regen_buffer[189:]]))
self.assertTrue(all([byte == 0x00 for byte in self.terminal.display.eab_buffer[:184]]))
self.assertEqual(self.terminal.display.eab_buffer[184:189], bytes.fromhex('304080c000'))
self.assertTrue(all([byte == 0x00 for byte in self.terminal.display.eab_buffer[189:]]))
self.terminal.display.move_cursor.assert_called_with(index=8)
self.assertFalse(self.session.emulator.dirty)
def test_eof(self):
# Arrange
self.session.emulator.update = Mock(side_effect=EOFError)
@ -155,19 +56,6 @@ class SessionHandleHostTestCase(unittest.TestCase):
self.telnet.close.assert_called()
def test_keyboard_locked(self):
# Arrange
self.session.emulator.update = Mock(return_value=True)
self.session.emulator.cells = _create_screen_cells(24, 80)
self.session.emulator.dirty = set()
# Act
self.session.handle_host()
# Assert
self.terminal.display.status_line.write.assert_called_with(8, bytes.fromhex('f600b2b8b2b3a4ac00'))
class SessionHandleKeyTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
@ -304,22 +192,192 @@ class SessionHandleKeyTestCase(unittest.TestCase):
self.session.emulator.input.assert_called_with(0x81, True)
def test_protected_cell_operator_error(self):
def test_operator_error(self):
# Arrange
self.session.emulator.input = Mock(side_effect=ProtectedCellOperatorError)
self.assertIsNone(self.session.operator_error)
# Act
self.session.handle_key(Key.LOWER_A, KeyboardModifiers.NONE, None)
# Assert
self.assertIsInstance(self.session.operator_error, ProtectedCellOperatorError)
class SessionRenderTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.terminal.display = BufferedDisplay(self.terminal, Dimensions(24, 80), None)
self.terminal.display.buffered_write_byte = Mock(wraps=self.terminal.display.buffered_write_byte)
self.terminal.display.move_cursor = Mock(wraps=self.terminal.display.move_cursor)
self.terminal.display.flush = Mock(wraps=self.terminal.display.flush)
self.terminal.display.status_line.write = Mock(wraps=self.terminal.display.status_line.write)
self.session = TN3270Session(self.terminal, 'mainframe', 23)
self.telnet = Mock()
self.session.telnet = self.telnet
self.session.emulator = Mock()
patcher = patch('oec.display.read_address_counter_hi')
self.read_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.read_address_counter_lo')
self.read_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_hi')
self.load_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_lo')
self.load_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.write_data')
self.write_data_mock = patcher.start()
patcher = patch('oec.display.eab_write_alternate')
self.eab_write_alternate_mock = patcher.start()
self.addCleanup(patch.stopall)
def test_with_no_eab_feature(self):
# Arrange
cells = _create_screen_cells(24, 80)
_set_attribute(cells, 0, protected=True)
_set_characters(cells, 1, 'PROTECTED'.encode('cp500'))
_set_attribute(cells, 10, protected=True, intensified=True)
_set_characters(cells, 11, 'PROTECTED INTENSIFIED'.encode('cp500'))
_set_attribute(cells, 32, protected=True, hidden=True)
_set_characters(cells, 33, 'PROTECTED HIDDEN'.encode('cp500'))
_set_attribute(cells, 49, protected=False)
_set_characters(cells, 50, 'UNPROTECTED'.encode('cp500'))
_set_attribute(cells, 61, protected=False, intensified=True)
_set_characters(cells, 62, 'UNPROTECTED INTENSIFIED'.encode('cp500'))
_set_attribute(cells, 85, protected=False, hidden=True)
_set_characters(cells, 86, 'UNPROTECTED HIDDEN'.encode('cp500'))
_set_attribute(cells, 104, protected=True)
_set_formatting(cells, 104, color=Color.YELLOW)
_set_characters(cells, 105, 'EAB'.encode('cp500'))
_set_formatting(cells, 105, blink=True)
_set_formatting(cells, 106, reverse=True)
_set_formatting(cells, 107, underscore=True)
_set_attribute(cells, 108, protected=True)
self.session.emulator.cells = cells
self.session.emulator.dirty = set(range(109))
self.session.emulator.cursor_address = 8
# Act
self.session.render()
# Assert
regen_bytes = bytes.fromhex('e0afb1aeb3a4a2b3a4a3e8afb1aeb3a4a2b3a4a300a8adb3a4adb2a8a5a8a4a3ecafb1aeb3a4a2b3a4a300a7a8a3a3a4adc0b4adafb1aeb3a4a2b3a4a3c8b4adafb1aeb3a4a2b3a4a300a8adb3a4adb2a8a5a8a4a3ccb4adafb1aeb3a4a2b3a4a300a7a8a3a3a4ade0a4a0a1e0')
for (index, regen_byte) in enumerate(regen_bytes):
self.terminal.display.buffered_write_byte.assert_any_call(regen_byte, None, index=index)
self.terminal.display.flush.assert_called()
self.terminal.display.move_cursor.assert_called_with(index=8)
self.assertFalse(self.session.emulator.dirty)
def test_with_eab_feature(self):
# Arrange
self.terminal.display = BufferedDisplay(self.terminal, Dimensions(24, 80), 7)
self.terminal.display.buffered_write_byte = Mock(wraps=self.terminal.display.buffered_write_byte)
self.terminal.display.move_cursor = Mock(wraps=self.terminal.display.move_cursor)
self.terminal.display.flush = Mock(wraps=self.terminal.display.flush)
cells = _create_screen_cells(24, 80)
_set_attribute(cells, 0, protected=True)
_set_characters(cells, 1, 'PROTECTED'.encode('cp500'))
_set_attribute(cells, 10, protected=True, intensified=True)
_set_characters(cells, 11, 'PROTECTED INTENSIFIED'.encode('cp500'))
_set_attribute(cells, 32, protected=True, hidden=True)
_set_characters(cells, 33, 'PROTECTED HIDDEN'.encode('cp500'))
_set_attribute(cells, 49, protected=False)
_set_characters(cells, 50, 'UNPROTECTED'.encode('cp500'))
_set_attribute(cells, 61, protected=False, intensified=True)
_set_characters(cells, 62, 'UNPROTECTED INTENSIFIED'.encode('cp500'))
_set_attribute(cells, 85, protected=False, hidden=True)
_set_characters(cells, 86, 'UNPROTECTED HIDDEN'.encode('cp500'))
_set_attribute(cells, 104, protected=True)
_set_formatting(cells, 104, color=Color.YELLOW)
_set_characters(cells, 105, 'EAB'.encode('cp500'))
_set_formatting(cells, 105, blink=True)
_set_formatting(cells, 106, reverse=True)
_set_formatting(cells, 107, underscore=True)
_set_attribute(cells, 108, protected=True)
self.session.emulator.cells = cells
self.session.emulator.dirty = set(range(109))
self.session.emulator.cursor_address = 8
# Act
self.session.render()
# Assert
regen_bytes = bytes.fromhex('e0afb1aeb3a4a2b3a4a3e8afb1aeb3a4a2b3a4a300a8adb3a4adb2a8a5a8a4a3ecafb1aeb3a4a2b3a4a300a7a8a3a3a4adc0b4adafb1aeb3a4a2b3a4a3c8b4adafb1aeb3a4a2b3a4a300a8adb3a4adb2a8a5a8a4a3ccb4adafb1aeb3a4a2b3a4a300a7a8a3a3a4ade0a4a0a1e0')
eab_bytes = bytes.fromhex('0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000304080c000')
for (index, (regen_byte, eab_byte)) in enumerate(zip(regen_bytes, eab_bytes)):
self.terminal.display.buffered_write_byte.assert_any_call(regen_byte, eab_byte, index=index)
self.terminal.display.flush.assert_called()
self.terminal.display.move_cursor.assert_called_with(index=8)
self.assertFalse(self.session.emulator.dirty)
def test_keyboard_locked(self):
# Arrange
self.session.emulator.cells = _create_screen_cells(24, 80)
self.session.emulator.dirty = set()
self.session.emulator.cursor_address = 8
self.session.emulator.keyboard_locked = True
# Act
self.session.render()
# Assert
self.terminal.display.status_line.write.assert_called_with(8, bytes.fromhex('f600b2b8b2b3a4ac00'))
def test_protected_cell_operator_error(self):
# Arrange
self.session.emulator.cells = _create_screen_cells(24, 80)
self.session.emulator.dirty = set()
self.session.emulator.cursor_address = 8
self.session.operator_error = ProtectedCellOperatorError()
# Act
self.session.render()
# Assert
self.terminal.display.status_line.write.assert_called_with(8, bytes.fromhex('f600f8dbd800000000'))
def test_field_overflow_operator_error(self):
# Arrange
self.session.emulator.input = Mock(side_effect=FieldOverflowOperatorError)
self.session.emulator.cells = _create_screen_cells(24, 80)
self.session.emulator.dirty = set()
self.session.emulator.cursor_address = 8
self.session.operator_error = FieldOverflowOperatorError()
# Act
self.session.handle_key(Key.LOWER_A, KeyboardModifiers.NONE, None)
self.session.render()
# Assert
self.terminal.display.status_line.write.assert_called_with(8, bytes.fromhex('f600db080000000000'))

View File

@ -3,7 +3,8 @@ from unittest.mock import Mock, patch
import context
from oec.display import Dimensions
from oec.session import SessionDisconnectedError
from oec.display import Dimensions, BufferedDisplay
from oec.keyboard import Key, KeyboardModifiers
from oec.vt100 import VT100Session
@ -12,14 +13,11 @@ class SessionHandleHostTestCase(unittest.TestCase):
self.terminal = Mock()
self.terminal.display.dimensions = Dimensions(24, 80)
self.terminal.display.has_eab = True
self.session = VT100Session(self.terminal, None)
self.session.host_process = Mock()
self.addCleanup(patch.stopall)
def test(self):
# Arrange
self.session.host_process.read = Mock(return_value=b'abc')
@ -28,15 +26,21 @@ class SessionHandleHostTestCase(unittest.TestCase):
self.session.handle_host()
# Assert
self.terminal.display.buffered_write_byte.assert_any_call(0x80, 0x00, row=0, column=0)
self.terminal.display.buffered_write_byte.assert_any_call(0x81, 0x00, row=0, column=1)
self.terminal.display.buffered_write_byte.assert_any_call(0x82, 0x00, row=0, column=2)
row_buffer = self.session.vt100_screen.buffer[0]
self.terminal.display.flush.assert_called()
self.assertEqual(row_buffer[0].data, 'a')
self.assertEqual(row_buffer[1].data, 'b')
self.assertEqual(row_buffer[2].data, 'c')
self.terminal.display.move_cursor.assert_called_with(row=0, column=3)
def test_eof(self):
# Arrange
self.session.host_process.read = Mock(side_effect=EOFError)
self.assertFalse(self.session.vt100_screen.dirty)
# Act and assert
with self.assertRaises(SessionDisconnectedError):
self.session.handle_host()
self.assertIsNone(self.session.host_process)
def test_bell(self):
# Arrange
@ -99,3 +103,89 @@ class SessionHandleKeyTestCase(unittest.TestCase):
# Assert
self.session.host_process.write.assert_not_called()
class SessionRenderTestCase(unittest.TestCase):
def setUp(self):
self.terminal = Mock()
self.terminal.display = BufferedDisplay(self.terminal, Dimensions(24, 80), None)
self.terminal.display.buffered_write_byte = Mock(wraps=self.terminal.display.buffered_write_byte)
self.terminal.display.move_cursor = Mock(wraps=self.terminal.display.move_cursor)
self.terminal.display.flush = Mock(wraps=self.terminal.display.flush)
self.session = VT100Session(self.terminal, None)
self.session.host_process = Mock()
patcher = patch('oec.display.read_address_counter_hi')
self.read_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.read_address_counter_lo')
self.read_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_hi')
self.load_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_lo')
self.load_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.write_data')
self.write_data_mock = patcher.start()
patcher = patch('oec.display.eab_write_alternate')
self.eab_write_alternate_mock = patcher.start()
self.addCleanup(patch.stopall)
def test_with_no_eab_feature(self):
# Arrange
self.session.host_process.read = Mock(return_value=b'abc')
self.session.handle_host()
# Act
self.session.render()
# Assert
self.terminal.display.buffered_write_byte.assert_any_call(0x80, None, row=0, column=0)
self.terminal.display.buffered_write_byte.assert_any_call(0x81, None, row=0, column=1)
self.terminal.display.buffered_write_byte.assert_any_call(0x82, None, row=0, column=2)
self.terminal.display.flush.assert_called()
self.terminal.display.move_cursor.assert_called_with(row=0, column=3)
self.assertFalse(self.session.vt100_screen.dirty)
def test_with_eab_feature(self):
# Arrange
self.terminal.display = BufferedDisplay(self.terminal, Dimensions(24, 80), 7)
self.terminal.display.buffered_write_byte = Mock(wraps=self.terminal.display.buffered_write_byte)
self.terminal.display.move_cursor = Mock(wraps=self.terminal.display.move_cursor)
self.terminal.display.flush = Mock(wraps=self.terminal.display.flush)
self.session.host_process.read = Mock(return_value=b'abc')
self.session.handle_host()
# Act
self.session.render()
# Assert
self.terminal.display.buffered_write_byte.assert_any_call(0x80, 0x00, row=0, column=0)
self.terminal.display.buffered_write_byte.assert_any_call(0x81, 0x00, row=0, column=1)
self.terminal.display.buffered_write_byte.assert_any_call(0x82, 0x00, row=0, column=2)
self.terminal.display.flush.assert_called()
self.terminal.display.move_cursor.assert_called_with(row=0, column=3)
self.assertFalse(self.session.vt100_screen.dirty)