diff --git a/oec/display.py b/oec/display.py index 42c3d7f..7b84818 100644 --- a/oec/display.py +++ b/oec/display.py @@ -175,26 +175,12 @@ class Display: return True def _write_data(self, data): - chunks = self.terminal.interface.jumbo_write_split_data(data, -1) - - commands = [WriteData(chunks[0])] - - for chunk in chunks[1:]: - commands.append(Data(chunk)) - - self.terminal.execute(commands) + self.terminal.execute_jumbo_write(data, WriteData, Data, -1) def _eab_write_alternate(self, data): # The EAB_WRITE_ALTERNATE command data must be split so that the two bytes # do not get separated, otherwise the write will be incorrect. - chunks = self.terminal.interface.jumbo_write_split_data(data, -2) - - commands = [EABWriteAlternate(self.eab_address, chunks[0])] - - for chunk in chunks[1:]: - commands.append(Data(chunk)) - - self.terminal.execute(commands) + self.terminal.execute_jumbo_write(data, lambda chunk: EABWriteAlternate(self.eab_address, chunk), Data, -2) def _split_address(address): if address is None: diff --git a/oec/interface.py b/oec/interface.py index 14e9207..e6fa5c8 100644 --- a/oec/interface.py +++ b/oec/interface.py @@ -7,8 +7,6 @@ import os import logging from textwrap import dedent -from more_itertools import chunked - logger = logging.getLogger(__name__) class AggregateExecuteError(Exception): @@ -28,13 +26,11 @@ class InterfaceWrapper: self.jumbo_write_max_length = None if self.legacy_firmware_detected and self.jumbo_write_strategy is None: - _print_i1_jumbo_write_notice() - self.jumbo_write_strategy = 'split' - - if self.jumbo_write_strategy == 'split': self.jumbo_write_max_length = 1024 + _print_i1_jumbo_write_notice(self.jumbo_write_max_length) + def __getattr__(self, attr): return getattr(self.interface, attr) @@ -51,25 +47,6 @@ class InterfaceWrapper: return responses - def jumbo_write_split_data(self, data, first_chunk_max_length_adjustment=-1): - if self.jumbo_write_strategy != 'split': - return [data] - - if isinstance(data, tuple): - length = len(data[0]) * data[1] - else: - length = len(data) - - first_chunk_max_length = self.jumbo_write_max_length + first_chunk_max_length_adjustment - - if length <= first_chunk_max_length: - return [data] - - if isinstance(data, tuple): - data = data[0] * data[1] - - return [data[:first_chunk_max_length], *chunked(data[first_chunk_max_length:], self.jumbo_write_max_length)] - def address_commands(device_address, commands): if isinstance(commands, list): return [(device_address, command) for command in commands] @@ -82,31 +59,26 @@ def _get_jumbo_write_strategy(): if value is None: return None - if value in ['split', 'ignore']: + if value == 'ignore': return value logger.warning(f'Unsupported COAX_JUMBO option: {value}') return None -def _print_i1_jumbo_write_notice(): - notice = ''' +def _print_i1_jumbo_write_notice(max_length): + notice = f''' **** **** **** **** **** **** **** **** **** **** **** **** **** **** **** **** I think you are using an older firmware on the 1st generation, Arduino Mega based, interface which does not support the "jumbo write" required to write a full screen to the regen and EAB buffers. - I'm going to split large writes into multiple smaller 1024-byte writes... + I'm going to split large writes into multiple smaller {max_length}-byte writes... If you want to override this behavior, you can set the COAX_JUMBO environment variable as follows: - - COAX_JUMBO=split - split large writes into multiple smaller 32-byte writes - before sending to the interface, this will result in - additional round trips to the interface which may - manifest as visible incremental changes being applied - to the screen - COAX_JUMBO=ignore - try a jumbo write, anyway, use this option if you believe you are seeing this behavior in error diff --git a/oec/terminal.py b/oec/terminal.py index 2e107f2..ad75312 100644 --- a/oec/terminal.py +++ b/oec/terminal.py @@ -5,6 +5,7 @@ oec.terminal import time import logging +from more_itertools import chunked from coax import read_feature_ids, parse_features, Poll, ReadTerminalId, ReadExtendedId, \ LoadControlRegister, TerminalType, Feature, PollAction, Control, \ ProtocolError @@ -88,6 +89,21 @@ class Terminal: def execute(self, commands): return self.interface.execute(address_commands(self.device_address, commands)) + def execute_jumbo_write(self, data, create_first, create_subsequent, first_chunk_max_length_adjustment=-1): + max_length = None + + if self.interface.jumbo_write_strategy == 'split': + max_length = self.interface.jumbo_write_max_length + + chunks = _jumbo_write_split_data(data, max_length, first_chunk_max_length_adjustment) + + commands = [create_first(chunks[0])] + + for chunk in chunks[1:]: + commands.append(create_subsequent(chunk)) + + return self.execute(commands) + class UnsupportedTerminalError(Exception): """Unsupported terminal.""" @@ -156,3 +172,22 @@ def _get_features(interface, device_address): ids = interface.execute([address_commands(device_address, command) for command in commands]) return parse_features(ids, commands) + +def _jumbo_write_split_data(data, max_length, first_chunk_max_length_adjustment=-1): + if max_length is None: + return [data] + + if isinstance(data, tuple): + length = len(data[0]) * data[1] + else: + length = len(data) + + first_chunk_max_length = max_length + first_chunk_max_length_adjustment + + if length <= first_chunk_max_length: + return [data] + + if isinstance(data, tuple): + data = data[0] * data[1] + + return [data[:first_chunk_max_length], *chunked(data[first_chunk_max_length:], max_length)] diff --git a/tests/test_interface.py b/tests/test_interface.py index c8b4da1..05a4fc6 100644 --- a/tests/test_interface.py +++ b/tests/test_interface.py @@ -36,19 +36,6 @@ class InterfaceWrapperInitTestCase(unittest.TestCase): self.print_i1_jumbo_write_notice.assert_not_called() - def test_split_jumbo_write_strategy(self): - # Arrange - self.get_jumbo_write_strategy.return_value = 'split' - - # Act - interface_wrapper = InterfaceWrapper(self.interface) - - # Assert - self.assertEqual(interface_wrapper.jumbo_write_strategy, 'split') - self.assertEqual(interface_wrapper.jumbo_write_max_length, 1024) - - self.print_i1_jumbo_write_notice.assert_not_called() - def test_i1_no_jumbo_write_strategy(self): # Arrange self.interface.legacy_firmware_detected = True @@ -119,67 +106,6 @@ class InterfaceWrapperExecuteTestCase(unittest.TestCase): self.assertEqual(len(error.responses), 2) -class InterfaceWrapperJumboWriteSplitDataTestCase(unittest.TestCase): - def setUp(self): - self.interface = MockInterface() - - self.interface_wrapper = InterfaceWrapper(self.interface) - - def test_no_split_strategy(self): - # Arrange - self.interface_wrapper.jumbo_write_strategy = None - self.interface_wrapper.jumbo_write_max_length = 32 - - # Act and assert - for data in [bytes(range(0, 64)), (bytes.fromhex('00'), 64)]: - with self.subTest(data=data): - result = self.interface_wrapper.jumbo_write_split_data(data) - - self.assertEqual(len(result), 1) - - self.assertEqual(result[0], data) - - def test_split_strategy_one_chunk(self): - # Arrange - self.interface_wrapper.jumbo_write_strategy = 'split' - self.interface_wrapper.jumbo_write_max_length = 32 - - # Act and assert - for data in [bytes(range(0, 16)), (bytes.fromhex('00'), 16), bytes(range(0, 31)), (bytes.fromhex('00'), 31)]: - with self.subTest(data=data): - result = self.interface_wrapper.jumbo_write_split_data(data) - - self.assertEqual(len(result), 1) - - self.assertEqual(result[0], data) - - def test_split_strategy_two_chunks(self): - # Arrange - self.interface_wrapper.jumbo_write_strategy = 'split' - self.interface_wrapper.jumbo_write_max_length = 32 - - # Act and assert - for data in [bytes(range(0, 32)), (bytes.fromhex('00'), 32), bytes(range(0, 63)), (bytes.fromhex('00'), 63)]: - with self.subTest(data=data): - result = self.interface_wrapper.jumbo_write_split_data(data) - - self.assertEqual(len(result), 2) - self.assertEqual(len(result[0]), 31) - - def test_split_strategy_three_chunks(self): - # Arrange - self.interface_wrapper.jumbo_write_strategy = 'split' - self.interface_wrapper.jumbo_write_max_length = 32 - - # Act and assert - for data in [bytes(range(0, 64)), (bytes.fromhex('00'), 64), bytes(range(0, 95)), (bytes.fromhex('00'), 95)]: - with self.subTest(data=data): - result = self.interface_wrapper.jumbo_write_split_data(data) - - self.assertEqual(len(result), 3) - self.assertEqual(len(result[0]), 31) - self.assertEqual(len(result[1]), 32) - class AddressCommandsTestCase(unittest.TestCase): def test_single_command(self): # Arrange diff --git a/tests/test_terminal.py b/tests/test_terminal.py index da1ec72..0c3ca5b 100644 --- a/tests/test_terminal.py +++ b/tests/test_terminal.py @@ -6,7 +6,7 @@ from coax.protocol import TerminalId import context from oec.interface import InterfaceWrapper -from oec.terminal import create_terminal, Terminal, UnsupportedTerminalError +from oec.terminal import create_terminal, Terminal, UnsupportedTerminalError, _jumbo_write_split_data from oec.display import Display, Dimensions from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2 @@ -121,6 +121,42 @@ class CreateTerminalTestCase(unittest.TestCase): with self.assertRaises(UnsupportedTerminalError): create_terminal(interface, None, None, self.get_keymap) +class JumboWriteSplitDataTestCase(unittest.TestCase): + def test_no_split_strategy(self): + for data in [bytes(range(0, 64)), (bytes.fromhex('00'), 64)]: + with self.subTest(data=data): + result = _jumbo_write_split_data(data, None) + + self.assertEqual(len(result), 1) + + self.assertEqual(result[0], data) + + def test_split_strategy_one_chunk(self): + for data in [bytes(range(0, 16)), (bytes.fromhex('00'), 16), bytes(range(0, 31)), (bytes.fromhex('00'), 31)]: + with self.subTest(data=data): + result = _jumbo_write_split_data(data, 32) + + self.assertEqual(len(result), 1) + + self.assertEqual(result[0], data) + + def test_split_strategy_two_chunks(self): + for data in [bytes(range(0, 32)), (bytes.fromhex('00'), 32), bytes(range(0, 63)), (bytes.fromhex('00'), 63)]: + with self.subTest(data=data): + result = _jumbo_write_split_data(data, 32) + + self.assertEqual(len(result), 2) + self.assertEqual(len(result[0]), 31) + + def test_split_strategy_three_chunks(self): + for data in [bytes(range(0, 64)), (bytes.fromhex('00'), 64), bytes(range(0, 95)), (bytes.fromhex('00'), 95)]: + with self.subTest(data=data): + result = _jumbo_write_split_data(data, 32) + + self.assertEqual(len(result), 3) + self.assertEqual(len(result[0]), 31) + self.assertEqual(len(result[1]), 32) + def _create_terminal(interface): terminal_id = TerminalId(0b11110100) extended_id = 'c1348300'