pycoax 0.3.1

This commit is contained in:
Andrew Kay
2020-03-25 20:01:28 -05:00
parent 39689e04da
commit 7e8d4b53e6
6 changed files with 130 additions and 20 deletions

View File

@@ -3,7 +3,7 @@ import signal
import logging
import argparse
from serial import Serial
from coax import Interface1
from coax import SerialInterface
from .controller import Controller
from .tn3270 import TN3270Session
@@ -81,7 +81,7 @@ def main():
time.sleep(3)
# Initialize the interface.
interface = Interface1(serial)
interface = SerialInterface(serial)
firmware_version = interface.reset()

View File

@@ -179,7 +179,7 @@ class Controller:
poll_action = self.terminal.get_poll_action() if self.terminal else PollAction.NONE
poll_response = poll(self.interface, poll_action, timeout=1)
poll_response = poll(self.interface, poll_action, receive_timeout=1)
if poll_response:
try:

View File

@@ -6,7 +6,8 @@ oec.display
from collections import namedtuple
import logging
from sortedcontainers import SortedSet
from coax import load_address_counter_hi, load_address_counter_lo
from coax import read_address_counter_hi, read_address_counter_lo, \
load_address_counter_hi, load_address_counter_lo, write_data
_ASCII_CHAR_MAP = {
'>': 0x08,
@@ -255,6 +256,9 @@ class Display:
raise ValueError('Either index or row and column is required')
def _calculate_address_after_write(self, address, count):
if address is None:
return None
address += count
(rows, columns) = self.dimensions
@@ -265,6 +269,12 @@ class Display:
return address
def _read_address_counter(self):
hi = read_address_counter_hi(self.interface)
lo = read_address_counter_lo(self.interface)
return (hi << 8) | lo
def _load_address_counter(self, address, force_load):
if address == self.address_counter and not force_load:
return False
@@ -298,29 +308,37 @@ class Display:
address = self._calculate_address(start_index)
try:
self._write(data, address=address if address != self.address_counter else None)
self._write(data, address=address)
except Exception as error:
# TODO: This could leave the address_counter incorrect.
self.logger.error(f'Write error: {error}', exc_info=error)
self.address_counter = self._calculate_address_after_write(address, len(data))
for index in range(start_index, end_index + 1):
self.dirty.discard(index)
return self.address_counter
def _write(self, data, address=None, restore_original_address=False):
if isinstance(data, tuple):
offload_data = data[0]
offload_repeat = max(data[1] - 1, 0)
else:
offload_data = data
offload_repeat = 0
if restore_original_address:
original_address = self.address_counter
self.interface.offload_write(offload_data, address=address,
restore_original_address=restore_original_address,
repeat=offload_repeat)
if original_address is None:
original_address = self._read_address_counter()
if address is not None:
self._load_address_counter(address, force_load=False)
write_data(self.interface, data)
if isinstance(address, tuple):
length = len(data[0]) * data[1]
else:
length = len(data)
self.address_counter = self._calculate_address_after_write(address, length)
if restore_original_address:
self._load_address_counter(original_address, force_load=True)
# TODO: add validation of column and data length for write() - must be inside status line
class StatusLine:

View File

@@ -1,5 +1,5 @@
ptyprocess==0.6.0
pycoax==0.2.0
pycoax==0.3.1
pyserial==3.4
pyte==0.8.0
pytn3270==0.5.0

View File

@@ -57,6 +57,10 @@ class RunLoopTestCase(unittest.TestCase):
patcher.start()
patcher = patch('oec.display.write_data')
patcher.start()
self.addCleanup(patch.stopall)
def test_no_terminal(self):

View File

@@ -157,6 +157,10 @@ class DisplayClearTestCase(unittest.TestCase):
self.load_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.write_data')
self.write_data_mock = patcher.start()
self.addCleanup(patch.stopall)
def test_excluding_status_line(self):
@@ -211,6 +215,10 @@ class DisplayFlushRangeTestCase(unittest.TestCase):
self.load_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.write_data')
self.write_data_mock = patcher.start()
self.addCleanup(patch.stopall)
def test_when_start_address_is_current_address_counter(self):
@@ -225,7 +233,7 @@ class DisplayFlushRangeTestCase(unittest.TestCase):
self.display.flush()
# Assert
self.display._write.assert_called_with(bytes.fromhex('01 02 03'), address=None)
self.display._write.assert_called_with(bytes.fromhex('01 02 03'), address=80)
self.assertEqual(self.display.address_counter, 83)
self.assertFalse(self.display.dirty)
@@ -363,19 +371,99 @@ class DisplayWriteTestCase(unittest.TestCase):
self.display = Display(self.interface, dimensions)
self.display._load_address_counter = Mock(wraps=self.display._load_address_counter)
patcher = patch('oec.display.load_address_counter_hi')
self.load_address_counter_hi_mock = patcher.start()
patcher = patch('oec.display.load_address_counter_lo')
self.load_address_counter_lo_mock = patcher.start()
patcher = patch('oec.display.write_data')
self.write_data_mock = patcher.start()
self.addCleanup(patch.stopall)
def test(self):
# Act
self.display._write(bytes.fromhex('01 02 03'))
# Assert
self.interface.offload_write.assert_called_with(bytes.fromhex('01 02 03'), address=None, restore_original_address=False, repeat=0)
self.assertIsNone(self.display.address_counter)
self.write_data_mock.assert_called_with(self.interface, bytes.fromhex('01 02 03'))
def test_repeat(self):
# Act
self.display._write((bytes.fromhex('01 02 03'), 3))
# Assert
self.interface.offload_write.assert_called_with(bytes.fromhex('01 02 03'), address=None, restore_original_address=False, repeat=2)
self.assertIsNone(self.display.address_counter)
self.write_data_mock.assert_called_with(self.interface, (bytes.fromhex('01 02 03'), 3))
def test_address_if_current_address_unknown(self):
# Arrange
self.assertIsNone(self.display.address_counter)
# Act
self.display._write(bytes.fromhex('01 02 03'), address=80)
# Assert
self.assertEqual(self.display.address_counter, 83)
def test_address_if_change(self):
# Arrange
self.display.address_counter = 160
# Act
self.display._write(bytes.fromhex('01 02 03'), address=80)
# Assert
self.assertEqual(self.display.address_counter, 83)
self.display._load_address_counter.assert_called_with(80, force_load=False)
def test_address_if_no_change(self):
# Arrange
self.display.address_counter = 80
# Act
self.display._write(bytes.fromhex('01 02 03'), address=80)
# Assert
self.assertEqual(self.display.address_counter, 83)
self.display._load_address_counter.assert_called_with(80, force_load=False)
def test_restore_original_address_if_current_address_unknown(self):
# Arrange
self.display._read_address_counter = Mock(return_value=160)
self.assertIsNone(self.display.address_counter)
# Act
self.display._write(bytes.fromhex('01 02 03'), restore_original_address=True)
# Assert
self.assertEqual(self.display.address_counter, 160)
def test_restore_original_address_if_current_address_known(self):
# Arrange
self.display._read_address_counter = Mock(return_value=160)
self.display.address_counter = 160
# Act
self.display._write(bytes.fromhex('01 02 03'), restore_original_address=True)
# Assert
self.assertEqual(self.display.address_counter, 160)
self.display._read_address_counter.assert_not_called()
class EncodeAsciiCharacterTestCase(unittest.TestCase):
def test_mapped_character(self):