lowobservable.oec/tests/test_device.py
2025-12-16 17:54:33 -06:00

199 lines
6.8 KiB
Python

import unittest
from unittest.mock import Mock, patch
from logging import Logger
from coax import TerminalType, Feature, ReadAddressCounterHi, ReadAddressCounterLo, ReadTerminalId, ReadExtendedId, ReadFeatureId, ProtocolError, LoadAddressCounterLo, LoadSecondaryControl
from coax.protocol import TerminalId
import context
from oec.interface import InterfaceWrapper
from oec.device import address_commands, format_address, get_ids, get_features, _jumbo_write_split_data
from mock_interface import MockInterface
class AddressCommandsTestCase(unittest.TestCase):
def test_single_command(self):
# Arrange
command = ReadAddressCounterHi()
# Act
result = address_commands(0b111000, command)
# Assert
self.assertEqual(result, (0b111000, command))
def test_multiple_commands(self):
# Arrange
commands = [ReadAddressCounterHi(), ReadAddressCounterLo()]
# Act
result = address_commands(0b111000, commands)
# Assert
self.assertEqual(result, [(0b111000, commands[0]), (0b111000, commands[1])])
class FormatAddressTestCase(unittest.TestCase):
def setUp(self):
self.interface = MockInterface()
def test_no_port(self):
self.assertEqual(format_address(InterfaceWrapper(self.interface), None), '/dev/mock#0')
def test_known_multiplexer_port(self):
self.assertEqual(format_address(InterfaceWrapper(self.interface), 0b110000), '/dev/mock#3')
def test_unknown_multiplexer_port(self):
self.assertEqual(format_address(InterfaceWrapper(self.interface), 0b111111), '/dev/mock?111111')
class GetIdsTestCase(unittest.TestCase):
def setUp(self):
self.interface = MockInterface()
patcher = patch('oec.device.logger', autospec=Logger)
self.logger = patcher.start()
patcher = patch('oec.device.time.sleep')
self.sleep = patcher.start()
self.addCleanup(patch.stopall)
def test_dft(self):
# Arrange
self.interface.mock_responses = [(None, ReadTerminalId, None, TerminalId(0b00000001))]
# Act
(terminal_id, extended_id) = get_ids(InterfaceWrapper(self.interface), None)
# Assert
self.assertEqual(terminal_id.type, TerminalType.DFT)
self.assertIsNone(extended_id)
self.interface.assert_command_not_executed(None, ReadExtendedId)
def test_no_extended_id(self):
# Arrange
self.interface.mock_responses = [(None, ReadTerminalId, None, TerminalId(0b11110100))]
# Act
(terminal_id, extended_id) = get_ids(InterfaceWrapper(self.interface), None)
# Assert
self.assertEqual(terminal_id.type, TerminalType.CUT)
self.assertEqual(terminal_id.model, 2)
self.assertEqual(terminal_id.keyboard, 15)
self.assertIsNone(extended_id)
self.interface.assert_command_executed(None, ReadExtendedId)
def test_extended_id(self):
# Arrange
self.interface.mock_responses = [
(None, ReadTerminalId, None, TerminalId(0b11110100)),
(None, ReadExtendedId, None, bytes.fromhex('01 02 03 04'))
]
# Act
(terminal_id, extended_id) = get_ids(InterfaceWrapper(self.interface), None)
# Assert
self.assertEqual(terminal_id.type, TerminalType.CUT)
self.assertEqual(terminal_id.model, 2)
self.assertEqual(terminal_id.keyboard, 15)
self.assertEqual(extended_id, '01020304')
self.interface.assert_command_executed(None, LoadSecondaryControl, lambda command: command.control.big == False)
self.interface.assert_command_executed(None, LoadAddressCounterLo, lambda command: command.address == 0)
def test_terminal_id_error(self):
# Arrange
self.interface.mock_responses = [
(None, ReadTerminalId, None, Mock(side_effect=ProtocolError))
]
# Act
(terminal_id, extended_id) = get_ids(InterfaceWrapper(self.interface), None)
# Assert
self.assertIsNone(terminal_id)
self.assertIsNone(extended_id)
self.logger.warning.assert_called()
def test_extended_id_error(self):
# Arrange
self.interface.mock_responses = [
(None, ReadTerminalId, None, TerminalId(0b11110100)),
(None, ReadExtendedId, None, Mock(side_effect=ProtocolError))
]
# Act
(terminal_id, extended_id) = get_ids(InterfaceWrapper(self.interface), None)
# Assert
self.assertEqual(terminal_id.type, TerminalType.CUT)
self.assertEqual(terminal_id.model, 2)
self.assertEqual(terminal_id.keyboard, 15)
self.assertIsNone(extended_id)
self.logger.warning.assert_called()
class GetFeaturesTestCase(unittest.TestCase):
def setUp(self):
self.interface = MockInterface()
def test_no_features(self):
# Act
features = get_features(InterfaceWrapper(self.interface), None)
# Assert
self.assertEqual(features, { })
def test_eab_feature(self):
# Arrange
self.interface.mock_responses = [(None, ReadFeatureId, lambda command: command.feature_address == 7, Feature.EAB.value)]
# Act
features = get_features(InterfaceWrapper(self.interface), None)
# Assert
self.assertEqual(features, { Feature.EAB: 7 })
class JumboWriteSplitDataTestCase(unittest.TestCase):
def test_no_split_strategy(self):
for data in [bytes(range(0, 64)), (bytes.fromhex('00'), 64)]:
with self.subTest(data=data):
result = _jumbo_write_split_data(data, None)
self.assertEqual(len(result), 1)
self.assertEqual(result[0], data)
def test_split_strategy_one_chunk(self):
for data in [bytes(range(0, 16)), (bytes.fromhex('00'), 16), bytes(range(0, 31)), (bytes.fromhex('00'), 31)]:
with self.subTest(data=data):
result = _jumbo_write_split_data(data, 32)
self.assertEqual(len(result), 1)
self.assertEqual(result[0], data)
def test_split_strategy_two_chunks(self):
for data in [bytes(range(0, 32)), (bytes.fromhex('00'), 32), bytes(range(0, 63)), (bytes.fromhex('00'), 63)]:
with self.subTest(data=data):
result = _jumbo_write_split_data(data, 32)
self.assertEqual(len(result), 2)
self.assertEqual(len(result[0]), 31)
def test_split_strategy_three_chunks(self):
for data in [bytes(range(0, 64)), (bytes.fromhex('00'), 64), bytes(range(0, 95)), (bytes.fromhex('00'), 95)]:
with self.subTest(data=data):
result = _jumbo_write_split_data(data, 32)
self.assertEqual(len(result), 3)
self.assertEqual(len(result[0]), 31)
self.assertEqual(len(result[1]), 32)