From f89f9b16d7870ac82d2010d6654ee88fd1541139 Mon Sep 17 00:00:00 2001 From: Andrew Kay Date: Mon, 23 Mar 2020 19:35:21 -0500 Subject: [PATCH] New serial protocol --- interface1/firmware/CoaxTransceiver.cpp | 47 ++-- interface1/firmware/CoaxTransceiver.h | 4 +- interface1/firmware/firmware.ino | 161 +++-------- pycoax/coax/__init__.py | 2 +- pycoax/coax/interface.py | 19 ++ pycoax/coax/interface1.py | 218 --------------- pycoax/coax/protocol.py | 16 +- pycoax/coax/serial_interface.py | 191 +++++++++++++ pycoax/examples/01_reset.py | 18 +- pycoax/examples/02_poll.py | 26 +- pycoax/examples/03_read_terminal_id.py | 24 +- .../{04_hello_world.py => 04_write_data.py} | 28 +- pycoax/examples/05_offload_write.py | 31 -- pycoax/examples/07_read_status.py | 24 +- pycoax/examples/08_load_control_register.py | 25 +- pycoax/examples/09_read_data.py | 25 +- pycoax/examples/10_read_multiple.py | 25 +- pycoax/examples/11_reset.py | 24 +- pycoax/examples/12_search.py | 25 +- pycoax/examples/13_insert_byte.py | 25 +- pycoax/examples/14_clear.py | 25 +- .../{06_char_map.py => 20_char_map.py} | 24 +- pycoax/examples/common.py | 50 ++++ pycoax/tests/test_interface1.py | 264 ------------------ pycoax/tests/test_protocol.py | 30 +- pycoax/tests/test_serial_interface.py | 124 ++++++++ 26 files changed, 530 insertions(+), 945 deletions(-) create mode 100644 pycoax/coax/interface.py delete mode 100644 pycoax/coax/interface1.py create mode 100644 pycoax/coax/serial_interface.py rename pycoax/examples/{04_hello_world.py => 04_write_data.py} (50%) delete mode 100755 pycoax/examples/05_offload_write.py rename pycoax/examples/{06_char_map.py => 20_char_map.py} (59%) create mode 100644 pycoax/examples/common.py delete mode 100644 pycoax/tests/test_interface1.py create mode 100644 pycoax/tests/test_serial_interface.py diff --git a/interface1/firmware/CoaxTransceiver.cpp b/interface1/firmware/CoaxTransceiver.cpp index a5bd3e4..0da3616 100644 --- a/interface1/firmware/CoaxTransceiver.cpp +++ b/interface1/firmware/CoaxTransceiver.cpp @@ -85,14 +85,14 @@ static void CoaxTransceiver::setup() { txSetup(); } -static int /* ssize_t */ CoaxTransceiver::transmitReceive(uint16_t commandWord, uint8_t *dataBuffer, size_t dataBufferCount, uint16_t *receiveBuffer, size_t receiveBufferSize, uint16_t timeout) { - int returnValue = transmit(commandWord, dataBuffer, dataBufferCount); +static int /* ssize_t */ CoaxTransceiver::transmitReceive(uint16_t *transmitBuffer, size_t transmitBufferCount, uint16_t *receiveBuffer, size_t receiveBufferSize, uint16_t receiveTimeout) { + int returnValue = transmit(transmitBuffer, transmitBufferCount); if (returnValue < 0) { return returnValue; } - return receive(receiveBuffer, receiveBufferSize, timeout); + return receive(receiveBuffer, receiveBufferSize, receiveTimeout); } static void CoaxTransceiver::dataBusSetup() { @@ -158,7 +158,7 @@ static void CoaxTransceiver::txSetup() { pinMode(TX_REGISTERS_FULL_PIN, INPUT); } -static int /* ssize_t */ CoaxTransceiver::transmit(uint16_t commandWord, uint8_t *dataBuffer, size_t dataCount) { +static int /* ssize_t */ CoaxTransceiver::transmit(uint16_t *buffer, size_t bufferCount) { // Ensure receiver is inactive. if (rxState != RX_STATE_DISABLED) { return ERROR_TX_RECEIVER_ACTIVE; @@ -178,35 +178,20 @@ static int /* ssize_t */ CoaxTransceiver::transmit(uint16_t commandWord, uint8_t DDRA = B11111111; DDRC = B00000011; - // Send command word - we make an assumption here that TX_REGISTERS_FULL is not set. - PORTC = (PINC & 0xfc) | ((commandWord >> 8) & 0x3); - PORTA = commandWord & 0xff; - - PORTE &= ~0x20; // TX Register Load - Low (Load) - PORTE |= 0x20; // TX Register Load - High - - // Send data - offload parity computation to DP8340. - if (dataCount > 0) { - // Enable transmitter parity calculation. - PORTH &= ~0x08; // TX Parity Control - Low + // Transmit. + for (int index = 0; index < bufferCount; index++) { + uint16_t data = buffer[index]; - for (int index = 0; index < dataCount; index++) { - // Wait while TX Registers Full is high. - while ( (PING & 0x20) == 0x20) { - NOP; - } - - uint8_t data = dataBuffer[index]; - - PORTC = (PINC & 0xfc) | ((data >> 6) & 0x3); - PORTA = (data << 2); - - PORTE &= ~0x20; // TX Register Load - Low (Load) - PORTE |= 0x20; // TX Register Load - High + // Wait while TX Registers Full is high. + while ((PING & 0x20) == 0x20) { + NOP; } - // Disable transmitter parity calculation. - PORTH |= 0x08; // TX Parity Control - High + PORTC = (PINC & 0xfc) | ((data >> 8) & 0x3); + PORTA = data & 0xff; + + PORTE &= ~0x20; // TX Register Load - Low (Load) + PORTE |= 0x20; // TX Register Load - High } // Configure data bus for input. @@ -219,7 +204,7 @@ static int /* ssize_t */ CoaxTransceiver::transmit(uint16_t commandWord, uint8_t // Enable interrupts. interrupts(); - return dataCount; + return bufferCount; } static int /* ssize_t */ CoaxTransceiver::receive(uint16_t *buffer, size_t bufferSize, uint16_t timeout) { diff --git a/interface1/firmware/CoaxTransceiver.h b/interface1/firmware/CoaxTransceiver.h index 241587b..e3c8c92 100644 --- a/interface1/firmware/CoaxTransceiver.h +++ b/interface1/firmware/CoaxTransceiver.h @@ -23,13 +23,13 @@ class CoaxTransceiver { public: static void setup(); - static int /* ssize_t */ transmitReceive(uint16_t commandWord, uint8_t *dataBuffer, size_t dataBufferCount, uint16_t *receiveBuffer, size_t receiveBufferSize, uint16_t timeout); + static int /* ssize_t */ transmitReceive(uint16_t *transmitBuffer, size_t transmitBufferCount, uint16_t *receiveBuffer, size_t receiveBufferSize, uint16_t receiveTimeout); private: static void dataBusSetup(); static void rxSetup(); static void txSetup(); - static int /* ssize_t */ transmit(uint16_t commandWord, uint8_t *dataBuffer, size_t dataCount); + static int /* ssize_t */ transmit(uint16_t *buffer, size_t bufferCount); static int /* ssize_t */ receive(uint16_t *buffer, size_t bufferSize, uint16_t timeout); static void rxActiveInterrupt(); diff --git a/interface1/firmware/firmware.ino b/interface1/firmware/firmware.ino index a5a7f8c..6c89883 100644 --- a/interface1/firmware/firmware.ino +++ b/interface1/firmware/firmware.ino @@ -17,14 +17,12 @@ #include "CoaxTransceiver.h" #define COMMAND_RESET 0x01 -#define COMMAND_EXECUTE 0x02 -#define COMMAND_EXECUTE_OFFLOAD 0x03 +#define COMMAND_TRANSMIT 0x02 +#define COMMAND_RECEIVE 0x04 +#define COMMAND_TRANSMIT_RECEIVE 0x06 #define ERROR_INVALID_MESSAGE 1 #define ERROR_UNKNOWN_COMMAND 2 -#define ERROR_UNKNOWN_OFFLOAD_COMMAND 3 - -#define UNPACK_DATA_WORD(w) (uint8_t) ((w >> 2) & 0xff) void handleResetCommand(uint8_t *buffer, int bufferCount) { uint8_t response[] = { 0x01, 0x00, 0x00, 0x01 }; @@ -32,22 +30,44 @@ void handleResetCommand(uint8_t *buffer, int bufferCount) { sendMessage(response, 4); } -void handleExecuteCommand(uint8_t *buffer, int bufferCount) { +void handleTransmitReceiveCommand(uint8_t *buffer, int bufferCount) { if (bufferCount < 6) { sendErrorMessage(ERROR_INVALID_MESSAGE); return; } - uint16_t commandWord = (buffer[0] << 8) | buffer[1]; - uint16_t receiveCount = (buffer[2] << 8) | buffer[3]; - uint16_t timeout = (buffer[4] << 8) | buffer[5]; + uint16_t transmitRepeatCount = ((buffer[0] << 8) | buffer[1]) & 0x7fff; + uint16_t transmitRepeatOffset = buffer[0] >> 7; - uint8_t *dataBuffer = buffer + 6; - uint16_t dataBufferCount = bufferCount - 6; + uint16_t *transmitBuffer = (uint16_t *) (buffer + 2); + uint16_t transmitBufferCount = (bufferCount - 6) / 2; + + uint16_t receiveBufferSize = (buffer[bufferCount - 4] << 8) | buffer[bufferCount - 3]; + uint16_t receiveTimeout = (buffer[bufferCount - 2] << 8) | buffer[bufferCount - 1]; + + if (transmitBufferCount < 1) { + sendErrorMessage(ERROR_INVALID_MESSAGE); + return; + } + + // Expand the provided data if applicable. + if (transmitRepeatCount > 1) { + uint8_t *source = ((uint8_t *) transmitBuffer) + (transmitRepeatOffset * 2); + uint8_t *destination = ((uint8_t *) transmitBuffer) + (transmitBufferCount * 2); + size_t length = (transmitBufferCount - transmitRepeatOffset) * 2; + + for (int index = 1; index < transmitRepeatCount; index++) { + memcpy(destination, source, length); + + transmitBufferCount += (transmitBufferCount - transmitRepeatOffset); + + destination += length; + } + } uint16_t *receiveBuffer = (uint16_t *) (buffer + 2); - - bufferCount = CoaxTransceiver::transmitReceive(commandWord, dataBuffer, dataBufferCount, receiveBuffer, receiveCount, timeout); + + bufferCount = CoaxTransceiver::transmitReceive(transmitBuffer, transmitBufferCount, receiveBuffer, receiveBufferSize, receiveTimeout); if (bufferCount < 0) { sendErrorMessage(100 + ((-1) * bufferCount)); @@ -62,113 +82,6 @@ void handleExecuteCommand(uint8_t *buffer, int bufferCount) { sendMessage(buffer + 1, bufferCount); } -void handleExecuteOffloadCommand(uint8_t *buffer, int bufferCount) { - if (bufferCount < 1) { - sendErrorMessage(ERROR_INVALID_MESSAGE); - return; - } - - uint8_t command = buffer[0]; - - if (command == 0x01) { - handleOffloadLoadAddressCounter(buffer + 1, bufferCount - 1); - } else if (command == 0x02) { - handleOffloadWrite(buffer + 1, bufferCount - 1); - } else { - sendErrorMessage(ERROR_UNKNOWN_OFFLOAD_COMMAND); - } -} - -void handleOffloadLoadAddressCounter(uint8_t *buffer, int bufferCount) { - uint16_t response; - - if (bufferCount < 2) { - sendErrorMessage(ERROR_INVALID_MESSAGE); - return; - } - - uint8_t hi = buffer[0]; - uint8_t lo = buffer[1]; - - // TODO: error handling... - CoaxTransceiver::transmitReceive(/* LOAD_ADDRESS_COUNTER_HI */ 0x11, &hi, 1, &response, 1, 0); - CoaxTransceiver::transmitReceive(/* LOAD_ADDRESS_COUNTER_LO */ 0x51, &lo, 1, &response, 1, 0); - - // Send the response message. - uint8_t message[] = { 0x01 }; - - sendMessage(message, 1); -} - -void handleOffloadWrite(uint8_t *buffer, int bufferCount) { - uint16_t response; - - if (bufferCount < 5) { - sendErrorMessage(ERROR_INVALID_MESSAGE); - return; - } - - uint8_t addressHi = buffer[0]; - uint8_t addressLo = buffer[1]; - bool restoreOriginalAddress = buffer[2]; - uint16_t repeatCount = (buffer[3] << 8) | buffer[4]; - - uint8_t *dataBuffer = buffer + 5; - uint16_t dataBufferCount = bufferCount - 5; - - if (dataBufferCount < 1) { - sendErrorMessage(ERROR_INVALID_MESSAGE); - return; - } - - // Repeat the provided data if applicable. - if (repeatCount > 0) { - uint16_t dataBufferIndex = dataBufferCount; - - for (int repeatIndex = 0; repeatIndex < repeatCount; repeatIndex++) { - for (int index = 0; index < dataBufferCount; index++) { - dataBuffer[dataBufferIndex++] = dataBuffer[index]; - } - } - - dataBufferCount *= (repeatCount + 1); - } - - // Store original address if applicable. - uint8_t originalAddressHi; - uint8_t originalAddressLo; - - if (restoreOriginalAddress) { - CoaxTransceiver::transmitReceive(/* READ_ADDRESS_COUNTER_HI */ 0x15, NULL, 0, &response, 1, 0); - - originalAddressHi = UNPACK_DATA_WORD(response); - - CoaxTransceiver::transmitReceive(/* READ_ADDRESS_COUNTER_LO */ 0x55, NULL, 0, &response, 1, 0); - - originalAddressLo = UNPACK_DATA_WORD(response); - } - - // Move to start address if applicable. - if (!(addressHi == 0xff && addressLo == 0xff)) { - CoaxTransceiver::transmitReceive(/* LOAD_ADDRESS_COUNTER_HI */ 0x11, &addressHi, 1, &response, 1, 0); - CoaxTransceiver::transmitReceive(/* LOAD_ADDRESS_COUNTER_LO */ 0x51, &addressLo, 1, &response, 1, 0); - } - - // Write buffer. - CoaxTransceiver::transmitReceive(/* WRITE_DATA */ 0x31, dataBuffer, dataBufferCount, &response, 1, 0); - - // Restore original address if applicable. - if (restoreOriginalAddress) { - CoaxTransceiver::transmitReceive(/* LOAD_ADDRESS_COUNTER_HI */ 0x11, &originalAddressHi, 1, &response, 1, 0); - CoaxTransceiver::transmitReceive(/* LOAD_ADDRESS_COUNTER_LO */ 0x51, &originalAddressLo, 1, &response, 1, 0); - } - - // Send the response message. - uint8_t message[] = { 0x01 }; - - sendMessage(message, 1); -} - void handleMessage(uint8_t *buffer, int bufferCount) { if (bufferCount < 1) { sendErrorMessage(ERROR_INVALID_MESSAGE); @@ -179,10 +92,8 @@ void handleMessage(uint8_t *buffer, int bufferCount) { if (command == COMMAND_RESET) { handleResetCommand(buffer + 1, bufferCount - 1); - } else if (command == COMMAND_EXECUTE) { - handleExecuteCommand(buffer + 1, bufferCount - 1); - } else if (command == COMMAND_EXECUTE_OFFLOAD) { - handleExecuteOffloadCommand(buffer + 1, bufferCount - 1); + } else if (command == COMMAND_TRANSMIT_RECEIVE) { + handleTransmitReceiveCommand(buffer + 1, bufferCount - 1); } else { sendErrorMessage(ERROR_UNKNOWN_COMMAND); } @@ -199,7 +110,7 @@ enum { ESCAPE } frameState; -#define FRAME_BUFFER_SIZE (25 * 80) + 32 +#define FRAME_BUFFER_SIZE ((25 * 80) * 2) + 32 uint8_t frameBuffer[FRAME_BUFFER_SIZE]; int frameBufferCount = 0; diff --git a/pycoax/coax/__init__.py b/pycoax/coax/__init__.py index 31433d0..efc9602 100644 --- a/pycoax/coax/__init__.py +++ b/pycoax/coax/__init__.py @@ -1,6 +1,6 @@ from .__about__ import __version__ -from .interface1 import Interface1 +from .serial_interface import SerialInterface from .protocol import ( PollAction, diff --git a/pycoax/coax/interface.py b/pycoax/coax/interface.py new file mode 100644 index 0000000..77ad984 --- /dev/null +++ b/pycoax/coax/interface.py @@ -0,0 +1,19 @@ +""" +coax.interface +~~~~~~~~~~~~~~ +""" + +class Interface: + def reset(self): + raise NotImplementedError + + def transmit(self, words, repeat_count=None, repeat_offset=1): + raise NotImplementedError + + def receive(self, length=None, timeout=None): + raise NotImplementedError + + def transmit_receive(self, transmit_words, transmit_repeat_count=None, + transmit_repeat_offset=1, receive_length=None, + receive_timeout=None): + raise NotImplementedError diff --git a/pycoax/coax/interface1.py b/pycoax/coax/interface1.py deleted file mode 100644 index d3e98cf..0000000 --- a/pycoax/coax/interface1.py +++ /dev/null @@ -1,218 +0,0 @@ -""" -coax.interface1 -~~~~~~~~~~~~~~~ -""" - -from enum import Flag -import itertools -import struct -from sliplib import SlipWrapper, ProtocolError - -from .exceptions import InterfaceError, InterfaceTimeout, ReceiveError, ReceiveTimeout - -class Interface1: - """A serial attached Arduino interface using the National Semiconductor - DP8340 and DP8341. - """ - - def __init__(self, serial): - if serial is None: - raise ValueError('Serial port is required') - - self.serial = serial - - self.slip_serial = SlipSerial(self.serial) - - def reset(self): - """Reset the interface.""" - original_serial_timeout = self.serial.timeout - - self.serial.reset_input_buffer() - - self._write_message(b'\x01') - - self.serial.timeout = 5 - - try: - message = self._read_message() - finally: - self.serial.timeout = original_serial_timeout - - if message[0] != 0x01: - raise _convert_error(message) - - if len(message) != 4: - raise InterfaceError('Invalid reset response') - - (major, minor, patch) = struct.unpack('BBB', message[1:]) - - return '{}.{}.{}'.format(major, minor, patch) - - def execute(self, command_word, data=None, response_length=1, timeout=None): - """Executes a command. - - :param command_word: the command to execute - :param data: optional bytearray containing command data - :param response_length: the expected response length - :param timeout: optional timeout in seconds - """ - timeout_milliseconds = 0 - - if timeout: - if self.serial.timeout and timeout > self.serial.timeout: - raise ValueError('Timeout cannot be greater than serial timeout') - - timeout_milliseconds = int(timeout * 1000) - - message = struct.pack(">BHHH", 0x02, command_word, response_length, - timeout_milliseconds) - - if data is not None: - message += data - - self._write_message(message) - - message = self._read_message() - - if message[0] != 0x01: - raise _convert_error(message) - - response_bytes = message[1:] - - response_words = [(hi << 8) | lo for (lo, hi) in zip(response_bytes[::2], - response_bytes[1::2])] - - # Handle any receiver (DP8341) errors that are included in the response words. - error_words = [word for word in response_words if (word & 0x8000) == 0x8000] - - if error_words: - raise _convert_receiver_errors(error_words) - - return response_words - - def offload_load_address_counter(self, address): - """Executes a combined LO and HI address counter load. - - :param address: the address - """ - parameters = struct.pack(">H", address) - - self._execute_offload(0x01, parameters) - - def offload_write(self, data, address=None, restore_original_address=False, repeat=0): - """Executes a complex write operation. - - :param data: the data - :param address: optional address to load before WRITE_DATA command - :param restore_original_address: restore the original data after write - :param repeat: repeat the data - """ - parameters = struct.pack(">HBH", 0xffff if address is None else address, - 0x01 if restore_original_address else 0x00, - repeat) + data - - self._execute_offload(0x02, parameters) - - def _execute_offload(self, command, parameters=None): - """Executes an offloaded command.""" - message = struct.pack("BB", 0x03, command) - - if parameters: - message += parameters - - self._write_message(message) - - message = self._read_message() - - if message[0] != 0x01: - raise _convert_error(message) - - def _read_message(self): - try: - message = self.slip_serial.recv_msg() - except ProtocolError: - raise InterfaceError('SLIP protocol error') - - if len(message) < 4: - raise InterfaceError('Invalid response message') - - (length,) = struct.unpack(">H", message[:2]) - - if length != len(message) - 4: - raise InterfaceError('Response message length mismatch') - - if length < 1: - raise InterfaceError('Empty response message') - - return message[2:-2] - - def _write_message(self, message): - self.slip_serial.send_msg(struct.pack(">H", len(message)) + message + struct.pack(">H", 0)) - -ERROR_MAP = { - 1: InterfaceError('Invalid request message'), - 2: InterfaceError('Unknown command'), - 3: InterfaceError('Unknown offload command'), - - 101: InterfaceError('Receiver active'), - 102: ReceiveTimeout(), - 103: ReceiveError('Receiver buffer overflow') -} - -def _convert_error(message): - if message[0] != 0x02: - return InterfaceError('Invalid response') - - if len(message) < 2: - return InterfaceError('Invalid error response') - - if message[1] in ERROR_MAP: - return ERROR_MAP[message[1]] - - return InterfaceError('Unknown error') - -class ReceiverErrorCode(Flag): - """Receiver (DP8341) error code.""" - DATA_OVERFLOW = 0x01 - PARITY = 0x02 - TRANSMIT_CHECK_CONDITIONS = 0x04 - INVALID_ENDING_SEQUENCE = 0x08 - MID_BID_TRANSITION = 0x10 - STARTING_SEQUENCE = 0x20 - RECEIVER_DISABLED = 0x40 - -def _parse_receiver_error(word): - return [code for code in ReceiverErrorCode if code & ReceiverErrorCode(word & 0x7f)] - -def _convert_receiver_errors(words): - codes = set(itertools.chain.from_iterable([_parse_receiver_error(word) for word - in words])) - - message = 'Receiver ' + ', '.join([code.name for code in codes]) + ' error' - - raise ReceiveError(message) - -class SlipSerial(SlipWrapper): - """sliplib wrapper for pySerial.""" - - def send_bytes(self, packet): - """Sends a packet over the serial port.""" - self.stream.write(packet) - self.stream.flush() - - def recv_bytes(self): - """Receive data from the serial port.""" - if self.stream.closed: - return b'' - - count = self.stream.in_waiting - - if count: - return self.stream.read(count) - - byte = self.stream.read(1) - - if byte == b'': - raise InterfaceTimeout() - - return byte diff --git a/pycoax/coax/protocol.py b/pycoax/coax/protocol.py index fd6b211..f9e1317 100644 --- a/pycoax/coax/protocol.py +++ b/pycoax/coax/protocol.py @@ -365,7 +365,8 @@ def _execute_read_command(interface, command_word, response_length=1, validate_response_length=True, allow_trta_response=False, trta_value=None, unpack=True, **kwargs): """Execute a standard read command.""" - response = interface.execute(command_word, response_length=response_length, **kwargs) + response = interface.transmit_receive([command_word], receive_length=response_length, + **kwargs) if allow_trta_response and len(response) == 1 and response[0] == 0: return trta_value @@ -379,7 +380,18 @@ def _execute_read_command(interface, command_word, response_length=1, def _execute_write_command(interface, command_word, data=None, **kwargs): """Execute a standard write command.""" - response = interface.execute(command_word, data, **kwargs) + data_words = [] + transmit_repeat_count = None + + if isinstance(data, tuple): + data_words = pack_data_words(data[0]) + transmit_repeat_count = data[1] + elif data is not None: + data_words = pack_data_words(data) + + response = interface.transmit_receive([command_word, *data_words], + transmit_repeat_count, + receive_length=1, **kwargs) if len(response) != 1: command = unpack_command_word(command_word) diff --git a/pycoax/coax/serial_interface.py b/pycoax/coax/serial_interface.py new file mode 100644 index 0000000..2fa7ba0 --- /dev/null +++ b/pycoax/coax/serial_interface.py @@ -0,0 +1,191 @@ +""" +coax.serial_interface +~~~~~~~~~~~~~~~~~~~~~ +""" + +import struct +from sliplib import SlipWrapper, ProtocolError + +from .interface import Interface +from .exceptions import InterfaceError, InterfaceTimeout, ReceiveError, ReceiveTimeout + +class SerialInterface(Interface): + def __init__(self, serial): + if serial is None: + raise ValueError('Serial port is required') + + self.serial = serial + + self.slip_serial = SlipSerial(self.serial) + + def reset(self): + original_serial_timeout = self.serial.timeout + + self.serial.reset_input_buffer() + + self._write_message(bytes([0x01])) + + self.serial.timeout = 5 + + try: + message = self._read_message() + finally: + self.serial.timeout = original_serial_timeout + + if message[0] != 0x01: + raise _convert_error(message) + + if len(message) != 4: + raise InterfaceError('Invalid reset response') + + (major, minor, patch) = struct.unpack('BBB', message[1:]) + + return '{}.{}.{}'.format(major, minor, patch) + + def transmit(self, words, repeat_count=None, repeat_offset=1): + message = bytes([0x02]) + + message += _pack_transmit_header(repeat_count, repeat_offset) + message += _pack_transmit_data(words) + + self._write_message(message) + + message = self._read_message() + + if message[0] != 0x01: + raise _convert_error(message) + + def receive(self, length=None, timeout=None): + timeout_milliseconds = self._calculate_timeout_milliseconds(timeout) + + message = bytes([0x04]) + + message += _pack_receive_header(length, timeout_milliseconds) + + self._write_message(message) + + message = self._read_message() + + if message[0] != 0x01: + raise _convert_error(message) + + return _unpack_receive_data(message[1:]) + + def transmit_receive(self, transmit_words, transmit_repeat_count=None, + transmit_repeat_offset=1, receive_length=None, + receive_timeout=None): + timeout_milliseconds = self._calculate_timeout_milliseconds(receive_timeout) + + message = bytes([0x06]) + + message += _pack_transmit_header(transmit_repeat_count, transmit_repeat_offset) + message += _pack_transmit_data(transmit_words) + message += _pack_receive_header(receive_length, timeout_milliseconds) + + self._write_message(message) + + message = self._read_message() + + if message[0] != 0x01: + raise _convert_error(message) + + return _unpack_receive_data(message[1:]) + + def _calculate_timeout_milliseconds(self, timeout): + milliseconds = 0 + + if timeout: + if self.serial.timeout and timeout > self.serial.timeout: + raise ValueError('Timeout cannot be greater than serial timeout') + + milliseconds = int(timeout * 1000) + + return milliseconds + + def _read_message(self): + try: + message = self.slip_serial.recv_msg() + except ProtocolError: + raise InterfaceError('SLIP protocol error') + + if len(message) < 4: + raise InterfaceError('Invalid response message') + + (length,) = struct.unpack('>H', message[:2]) + + if length != len(message) - 4: + raise InterfaceError('Response message length mismatch') + + if length < 1: + raise InterfaceError('Empty response message') + + return message[2:-2] + + def _write_message(self, message): + self.slip_serial.send_msg(struct.pack('>H', len(message)) + message + + struct.pack('>H', 0)) + +def _pack_transmit_header(repeat_count, repeat_offset): + repeat = ((repeat_offset << 15) | repeat_count) if repeat_count else 0 + + return struct.pack('>H', repeat) + +def _pack_transmit_data(words): + bytes_ = bytearray() + + for word in words: + bytes_ += struct.pack('HH', length or 0, timeout_milliseconds) + +def _unpack_receive_data(bytes_): + return [(hi << 8) | lo for (lo, hi) in zip(bytes_[::2], bytes_[1::2])] + +ERROR_MAP = { + 1: InterfaceError('Invalid request message'), + 2: InterfaceError('Unknown command'), + + 101: InterfaceError('Receiver active'), + 102: ReceiveTimeout(), + 103: ReceiveError('Receiver buffer overflow') +} + +def _convert_error(message): + if message[0] != 0x02: + return InterfaceError('Invalid response') + + if len(message) < 2: + return InterfaceError('Invalid error response') + + if message[1] in ERROR_MAP: + return ERROR_MAP[message[1]] + + return InterfaceError('Unknown error') + +class SlipSerial(SlipWrapper): + """sliplib wrapper for pySerial.""" + + def send_bytes(self, packet): + """Sends a packet over the serial port.""" + self.stream.write(packet) + self.stream.flush() + + def recv_bytes(self): + """Receive data from the serial port.""" + if self.stream.closed: + return b'' + + count = self.stream.in_waiting + + if count: + return self.stream.read(count) + + byte = self.stream.read(1) + + if byte == b'': + raise InterfaceTimeout() + + return byte diff --git a/pycoax/examples/01_reset.py b/pycoax/examples/01_reset.py index a608bdf..45933f4 100755 --- a/pycoax/examples/01_reset.py +++ b/pycoax/examples/01_reset.py @@ -1,21 +1,9 @@ #!/usr/bin/env python -import sys -import time -from serial import Serial +from common import create_serial, create_interface -sys.path.append('..') - -from coax import Interface1, poll, poll_ack - -print('Opening serial port...') - -with Serial('/dev/ttyUSB0', 115200) as serial: - print('Sleeping to allow interface time to wake up...') - - time.sleep(3) - - interface = Interface1(serial) +with create_serial() as serial: + interface = create_interface(serial, reset=False, poll_flush=False) print('Resetting interface...') diff --git a/pycoax/examples/02_poll.py b/pycoax/examples/02_poll.py index 802ed25..c840bf4 100755 --- a/pycoax/examples/02_poll.py +++ b/pycoax/examples/02_poll.py @@ -1,31 +1,15 @@ #!/usr/bin/env python -import sys -import time -from serial import Serial +from common import create_serial, create_interface -sys.path.append('..') +from coax import poll, poll_ack -from coax import Interface1, poll, poll_ack - -print('Opening serial port...') - -with Serial('/dev/ttyUSB0', 115200) as serial: - print('Sleeping to allow interface time to wake up...') - - time.sleep(3) - - interface = Interface1(serial) - - print('Resetting interface...') - - version = interface.reset() - - print(f'Firmware version is {version}') +with create_serial() as serial: + interface = create_interface(serial, poll_flush=False) print('POLL...') - poll_response = poll(interface, timeout=5) + poll_response = poll(interface, receive_timeout=5) print(poll_response) diff --git a/pycoax/examples/03_read_terminal_id.py b/pycoax/examples/03_read_terminal_id.py index 7c1e4b4..53ed4b8 100755 --- a/pycoax/examples/03_read_terminal_id.py +++ b/pycoax/examples/03_read_terminal_id.py @@ -1,27 +1,11 @@ #!/usr/bin/env python -import sys -import time -from serial import Serial +from common import create_serial, create_interface -sys.path.append('..') +from coax import read_terminal_id -from coax import Interface1, read_terminal_id - -print('Opening serial port...') - -with Serial('/dev/ttyUSB0', 115200) as serial: - print('Sleeping to allow interface time to wake up...') - - time.sleep(3) - - interface = Interface1(serial) - - print('Resetting interface...') - - version = interface.reset() - - print(f'Firmware version is {version}') +with create_serial() as serial: + interface = create_interface(serial) print('READ_TERMINAL_ID...') diff --git a/pycoax/examples/04_hello_world.py b/pycoax/examples/04_write_data.py similarity index 50% rename from pycoax/examples/04_hello_world.py rename to pycoax/examples/04_write_data.py index 0bca84f..28ce94f 100755 --- a/pycoax/examples/04_hello_world.py +++ b/pycoax/examples/04_write_data.py @@ -1,27 +1,11 @@ #!/usr/bin/env python -import sys -import time -from serial import Serial +from common import create_serial, create_interface -sys.path.append('..') +from coax import read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data -from coax import Interface1, read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data - -print('Opening serial port...') - -with Serial('/dev/ttyUSB0', 115200) as serial: - print('Sleeping to allow interface time to wake up...') - - time.sleep(3) - - interface = Interface1(serial) - - print('Resetting interface...') - - version = interface.reset() - - print(f'Firmware version is {version}') +with create_serial() as serial: + interface = create_interface(serial) print('LOAD_ADDRESS_COUNTER_HI...') @@ -44,3 +28,7 @@ with Serial('/dev/ttyUSB0', 115200) as serial: lo = read_address_counter_lo(interface) print(f'hi = {hi:02x}, lo = {lo:02x}') + + print('WRITE_DATA (repeat twice)...') + + write_data(interface, (bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'), 2)) diff --git a/pycoax/examples/05_offload_write.py b/pycoax/examples/05_offload_write.py deleted file mode 100755 index 082d6d9..0000000 --- a/pycoax/examples/05_offload_write.py +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/env python - -import sys -import time -from serial import Serial - -sys.path.append('..') - -from coax import Interface1 - -print('Opening serial port...') - -with Serial('/dev/ttyUSB0', 115200) as serial: - print('Sleeping to allow interface time to wake up...') - - time.sleep(3) - - interface = Interface1(serial) - - print('Resetting interface...') - - version = interface.reset() - - print(f'Firmware version is {version}') - - for n in range(10): - print(f'Writing line {n + 1}...') - - address = ((7 + n) * 80) + 80 - - interface.offload_write(b'\x80', address=address, repeat=n) diff --git a/pycoax/examples/07_read_status.py b/pycoax/examples/07_read_status.py index 9c0784e..0132482 100755 --- a/pycoax/examples/07_read_status.py +++ b/pycoax/examples/07_read_status.py @@ -1,27 +1,11 @@ #!/usr/bin/env python -import sys -import time -from serial import Serial +from common import create_serial, create_interface -sys.path.append('..') +from coax import read_status -from coax import Interface1, read_status - -print('Opening serial port...') - -with Serial('/dev/ttyUSB0', 115200) as serial: - print('Sleeping to allow interface time to wake up...') - - time.sleep(3) - - interface = Interface1(serial) - - print('Resetting interface...') - - version = interface.reset() - - print(f'Firmware version is {version}') +with create_serial() as serial: + interface = create_interface(serial) print('READ_STATUS...') diff --git a/pycoax/examples/08_load_control_register.py b/pycoax/examples/08_load_control_register.py index e34f313..dd5deff 100755 --- a/pycoax/examples/08_load_control_register.py +++ b/pycoax/examples/08_load_control_register.py @@ -1,30 +1,15 @@ #!/usr/bin/env python -import sys -import time -from serial import Serial +from common import create_serial, create_interface -sys.path.append('..') +from coax import Control, poll, load_address_counter_hi, load_address_counter_lo, write_data, load_control_register -from coax import Interface1, Control, poll, load_address_counter_hi, load_address_counter_lo, write_data, load_control_register - -print('Opening serial port...') - -with Serial('/dev/ttyUSB0', 115200) as serial: - print('Sleeping to allow interface time to wake up...') - - time.sleep(3) - - interface = Interface1(serial) - - print('Resetting interface...') - - version = interface.reset() - - print(f'Firmware version is {version}') +with create_serial() as serial: + interface = create_interface(serial) load_address_counter_hi(interface, 0) load_address_counter_lo(interface, 80) + write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19')) input('Press ENTER...') diff --git a/pycoax/examples/09_read_data.py b/pycoax/examples/09_read_data.py index e77f8d5..954fa9a 100755 --- a/pycoax/examples/09_read_data.py +++ b/pycoax/examples/09_read_data.py @@ -1,30 +1,15 @@ #!/usr/bin/env python -import sys -import time -from serial import Serial +from common import create_serial, create_interface -sys.path.append('..') +from coax import read_address_counter_hi, read_address_counter_lo, read_data, load_address_counter_hi, load_address_counter_lo, write_data -from coax import Interface1, read_address_counter_hi, read_address_counter_lo, read_data, load_address_counter_hi, load_address_counter_lo, write_data - -print('Opening serial port...') - -with Serial('/dev/ttyUSB0', 115200) as serial: - print('Sleeping to allow interface time to wake up...') - - time.sleep(3) - - interface = Interface1(serial) - - print('Resetting interface...') - - version = interface.reset() - - print(f'Firmware version is {version}') +with create_serial() as serial: + interface = create_interface(serial) load_address_counter_hi(interface, 0) load_address_counter_lo(interface, 80) + write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19')) load_address_counter_hi(interface, 0) diff --git a/pycoax/examples/10_read_multiple.py b/pycoax/examples/10_read_multiple.py index 61f82f9..230661a 100755 --- a/pycoax/examples/10_read_multiple.py +++ b/pycoax/examples/10_read_multiple.py @@ -1,30 +1,15 @@ #!/usr/bin/env python -import sys -import time -from serial import Serial +from common import create_serial, create_interface -sys.path.append('..') +from coax import SecondaryControl, read_address_counter_hi, read_address_counter_lo, read_multiple, load_address_counter_hi, load_address_counter_lo, load_secondary_control, write_data -from coax import Interface1, SecondaryControl, read_address_counter_hi, read_address_counter_lo, read_multiple, load_address_counter_hi, load_address_counter_lo, load_secondary_control, write_data - -print('Opening serial port...') - -with Serial('/dev/ttyUSB0', 115200) as serial: - print('Sleeping to allow interface time to wake up...') - - time.sleep(3) - - interface = Interface1(serial) - - print('Resetting interface...') - - version = interface.reset() - - print(f'Firmware version is {version}') +with create_serial() as serial: + interface = create_interface(serial) load_address_counter_hi(interface, 0) load_address_counter_lo(interface, 80) + write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19')) load_address_counter_hi(interface, 0) diff --git a/pycoax/examples/11_reset.py b/pycoax/examples/11_reset.py index 82a5e7c..13536f7 100755 --- a/pycoax/examples/11_reset.py +++ b/pycoax/examples/11_reset.py @@ -1,27 +1,11 @@ #!/usr/bin/env python -import sys -import time -from serial import Serial +from common import create_serial, create_interface -sys.path.append('..') +from coax import reset -from coax import Interface1, reset - -print('Opening serial port...') - -with Serial('/dev/ttyUSB0', 115200) as serial: - print('Sleeping to allow interface time to wake up...') - - time.sleep(3) - - interface = Interface1(serial) - - print('Resetting interface...') - - version = interface.reset() - - print(f'Firmware version is {version}') +with create_serial() as serial: + interface = create_interface(serial) print('RESET...') diff --git a/pycoax/examples/12_search.py b/pycoax/examples/12_search.py index 4c0211b..d2828e8 100755 --- a/pycoax/examples/12_search.py +++ b/pycoax/examples/12_search.py @@ -1,30 +1,15 @@ #!/usr/bin/env python -import sys -import time -from serial import Serial +from common import create_serial, create_interface -sys.path.append('..') +from coax import read_address_counter_hi, read_address_counter_lo, read_status, load_address_counter_hi, load_address_counter_lo, write_data, load_mask, search_forward, search_backward -from coax import Interface1, read_address_counter_hi, read_address_counter_lo, read_status, load_address_counter_hi, load_address_counter_lo, write_data, load_mask, search_forward, search_backward - -print('Opening serial port...') - -with Serial('/dev/ttyACM0', 115200) as serial: - print('Sleeping to allow interface time to wake up...') - - time.sleep(3) - - interface = Interface1(serial) - - print('Resetting interface...') - - version = interface.reset() - - print(f'Firmware version is {version}') +with create_serial() as serial: + interface = create_interface(serial) load_address_counter_hi(interface, 0) load_address_counter_lo(interface, 80) + write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19')) load_address_counter_hi(interface, 0) diff --git a/pycoax/examples/13_insert_byte.py b/pycoax/examples/13_insert_byte.py index 4444b1a..ad0ec08 100755 --- a/pycoax/examples/13_insert_byte.py +++ b/pycoax/examples/13_insert_byte.py @@ -1,30 +1,15 @@ #!/usr/bin/env python -import sys -import time -from serial import Serial +from common import create_serial, create_interface -sys.path.append('..') +from coax import read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data, insert_byte -from coax import Interface1, read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data, insert_byte - -print('Opening serial port...') - -with Serial('/dev/ttyUSB0', 115200) as serial: - print('Sleeping to allow interface time to wake up...') - - time.sleep(3) - - interface = Interface1(serial) - - print('Resetting interface...') - - version = interface.reset() - - print(f'Firmware version is {version}') +with create_serial() as serial: + interface = create_interface(serial) load_address_counter_hi(interface, 0) load_address_counter_lo(interface, 80) + write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19')) input('Press ENTER...') diff --git a/pycoax/examples/14_clear.py b/pycoax/examples/14_clear.py index 9c3dcfa..74e66d4 100755 --- a/pycoax/examples/14_clear.py +++ b/pycoax/examples/14_clear.py @@ -1,30 +1,15 @@ #!/usr/bin/env python -import sys -import time -from serial import Serial +from common import create_serial, create_interface -sys.path.append('..') +from coax import read_address_counter_hi, read_address_counter_lo, read_status, load_address_counter_hi, load_address_counter_lo, write_data, load_mask, clear -from coax import Interface1, read_address_counter_hi, read_address_counter_lo, read_status, load_address_counter_hi, load_address_counter_lo, write_data, load_mask, clear - -print('Opening serial port...') - -with Serial('/dev/ttyUSB0', 115200) as serial: - print('Sleeping to allow interface time to wake up...') - - time.sleep(3) - - interface = Interface1(serial) - - print('Resetting interface...') - - version = interface.reset() - - print(f'Firmware version is {version}') +with create_serial() as serial: + interface = create_interface(serial) load_address_counter_hi(interface, 0) load_address_counter_lo(interface, 80) + write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19')) input('Press ENTER...') diff --git a/pycoax/examples/06_char_map.py b/pycoax/examples/20_char_map.py similarity index 59% rename from pycoax/examples/06_char_map.py rename to pycoax/examples/20_char_map.py index e82f0c9..83bde17 100755 --- a/pycoax/examples/06_char_map.py +++ b/pycoax/examples/20_char_map.py @@ -1,29 +1,13 @@ #!/usr/bin/env python -import sys -import time -from serial import Serial +from common import create_serial, create_interface -sys.path.append('..') - -from coax import Interface1, read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data +from coax import read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data DIGIT_MAP = [0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x80, 0x81, 0x82, 0x83, 0x84, 0x85] -print('Opening serial port...') - -with Serial('/dev/ttyUSB0', 115200) as serial: - print('Sleeping to allow interface time to wake up...') - - time.sleep(3) - - interface = Interface1(serial) - - print('Resetting interface...') - - version = interface.reset() - - print(f'Firmware version is {version}') +with create_serial() as serial: + interface = create_interface(serial) print('LOAD_ADDRESS_COUNTER_HI...') diff --git a/pycoax/examples/common.py b/pycoax/examples/common.py new file mode 100644 index 0000000..c4b6f40 --- /dev/null +++ b/pycoax/examples/common.py @@ -0,0 +1,50 @@ +import sys +import time +from serial import Serial + +sys.path.append('..') + +from coax import SerialInterface, poll, poll_ack + +SERIAL_PORT = '/dev/ttyACM0' + +def create_serial(): + port = SERIAL_PORT + + print(f'Opening {port}...') + + serial = Serial(port, 115200) + + return serial + +def create_interface(serial, reset=True, poll_flush=True): + print('Sleeping to allow interface time to wake up...') + + time.sleep(3) + + interface = SerialInterface(serial) + + if reset: + print('Resetting interface...') + + version = interface.reset() + + print(f'Firmware version is {version}') + + if poll_flush: + print('POLLing...') + + count = 0 + + poll_response = poll(interface, receive_timeout=1) + + while poll_response: + poll_ack(interface) + + count += 1 + + poll_response = poll(interface, receive_timeout=1) + + print(f'ACK\'d {count} POLL responses') + + return interface diff --git a/pycoax/tests/test_interface1.py b/pycoax/tests/test_interface1.py deleted file mode 100644 index 8d50864..0000000 --- a/pycoax/tests/test_interface1.py +++ /dev/null @@ -1,264 +0,0 @@ -import unittest -from unittest.mock import Mock -import sliplib - -import context - -from coax import Interface1, InterfaceError, ReceiveError, ReceiveTimeout - -class Interface1ResetTestCase(unittest.TestCase): - def setUp(self): - self.serial = Mock() - - self.serial.timeout = None - - self.interface = Interface1(self.serial) - - self.interface._write_message = Mock() - self.interface._read_message = Mock(return_value=bytes.fromhex('01 01 02 03')) - - def test_message_is_sent(self): - # Act - self.interface.reset() - - # Assert - self.interface._write_message.assert_called_with(bytes.fromhex('01')) - - def test_version_is_formatted_correctly(self): - self.assertEqual(self.interface.reset(), '1.2.3') - - def test_timeout_is_restored_after_reset(self): - # Arrange - self.serial.timeout = 123 - - # Act - self.interface.reset() - - # Assert - self.assertEqual(self.serial.timeout, 123) - - def test_invalid_message_length_is_handled_correctly(self): - # Arrange - self.interface._read_message = Mock(return_value=bytes.fromhex('01 01')) - - # Act and assert - with self.assertRaisesRegex(InterfaceError, 'Invalid reset response'): - self.interface.reset() - - def test_error_is_handled_correctly(self): - # Arrange - self.interface._read_message = Mock(return_value=bytes.fromhex('02 01')) - - # Act and assert - with self.assertRaisesRegex(InterfaceError, 'Invalid request message'): - self.interface.reset() - -class Interface1ExecuteTestCase(unittest.TestCase): - def setUp(self): - self.serial = Mock() - - self.serial.timeout = None - - self.interface = Interface1(self.serial) - - self.interface._write_message = Mock() - self.interface._read_message = Mock(return_value=bytes.fromhex('01 00')) - - def test_message_is_sent_without_data(self): - # Act - self.interface.execute(0b0000001001) - - # Assert - self.interface._write_message.assert_called_with(bytes.fromhex('02 00 09 00 01 00 00')) - - def test_message_is_sent_with_data(self): - # Act - self.interface.execute(0b0000110001, data=bytes.fromhex('de ad be ef')) - - # Assert - self.interface._write_message.assert_called_with(bytes.fromhex('02 00 31 00 01 00 00 de ad be ef')) - - def test_message_is_sent_with_response_length(self): - # Act - self.interface.execute(0b0000011101, response_length=4) - - # Assert - self.interface._write_message.assert_called_with(bytes.fromhex('02 00 1d 00 04 00 00')) - - def test_timeout_cannot_exceed_serial_timeout(self): - # Arrange - self.serial.timeout = 2.0 - - # Act and assert - with self.assertRaisesRegex(ValueError, 'Timeout cannot be greater than serial timeout'): - self.interface.execute(0b0000000101, timeout=3) - - def test_message_is_sent_with_timeout(self): - # Act - self.interface.execute(0b0000000101, timeout=3) - - # Assert - self.interface._write_message.assert_called_with(bytes.fromhex('02 00 05 00 01 0b b8')) - - # TODO... interface timeout... - - def test_receive_timeout_is_handled_correctly(self): - # Arrange - self.interface._read_message = Mock(return_value=bytes.fromhex('02 66')) - - # Act and assert - with self.assertRaises(ReceiveTimeout): - self.interface.execute(0b0000000101) - - def test_receive_error_is_handled_correctly(self): - # Arrange - self.interface._read_message = Mock(return_value=bytes.fromhex('02 67')) - - # Act and assert - with self.assertRaisesRegex(ReceiveError, 'Receiver buffer overflow'): - self.interface.execute(0b0000000101) - - def test_interface_error_is_handled_correctly(self): - # Arrange - self.interface._read_message = Mock(return_value=bytes.fromhex('03')) - - # Act and assert - with self.assertRaisesRegex(InterfaceError, 'Invalid response'): - self.interface.execute(0b0000000101) - - def test_response_words_are_unpacked(self): - # Arrange - self.interface._read_message = Mock(return_value=bytes.fromhex('01 01 02 03 04')) - - # Act - response_words = self.interface.execute(0b0000011101, response_length=4) - - # Assert - self.assertEqual(response_words, [0x0201, 0x0403]) - - def test_receiver_error_is_handled_correctly(self): - # Arrange - self.interface._read_message = Mock(return_value=bytes.fromhex('01 a0 80')) - - # Act and assert - with self.assertRaisesRegex(ReceiveError, 'Receiver STARTING_SEQUENCE error'): - self.interface.execute(0b0000011101, response_length=4) - -class Interface1OffloadLoadAddressCounterTestCase(unittest.TestCase): - def setUp(self): - self.serial = Mock() - - self.interface = Interface1(self.serial) - - self.interface._execute_offload = Mock() - - def test(self): - # Act - self.interface.offload_load_address_counter(960) - - # Assert - self.interface._execute_offload.assert_called_with(0x01, bytes.fromhex('03 c0')) - -class Interface1OffloadWriteTestCase(unittest.TestCase): - def setUp(self): - self.serial = Mock() - - self.interface = Interface1(self.serial) - - self.interface._execute_offload = Mock() - - def test_message_is_sent_with_data(self): - # Act - self.interface.offload_write(bytes.fromhex('de ad be ef')) - - # Assert - self.interface._execute_offload.assert_called_with(0x02, bytes.fromhex('ff ff 00 00 00 de ad be ef')) - - def test_message_is_sent_with_address(self): - # Act - self.interface.offload_write(bytes.fromhex('de ad be ef'), address=960) - - # Assert - self.interface._execute_offload.assert_called_with(0x02, bytes.fromhex('03 c0 00 00 00 de ad be ef')) - - def test_message_is_sent_with_restore_original_address(self): - # Act - self.interface.offload_write(bytes.fromhex('de ad be ef'), restore_original_address=True) - - # Assert - self.interface._execute_offload.assert_called_with(0x02, bytes.fromhex('ff ff 01 00 00 de ad be ef')) - - def test_message_is_sent_with_repeat(self): - # Act - self.interface.offload_write(bytes.fromhex('de ad be ef'), repeat=1) - - # Assert - self.interface._execute_offload.assert_called_with(0x02, bytes.fromhex('ff ff 00 00 01 de ad be ef')) - -class Interface1ReadMessageTestCase(unittest.TestCase): - def setUp(self): - self.serial = Mock() - - self.interface = Interface1(self.serial) - - self.interface.slip_serial = Mock() - - def test(self): - # Arrange - self.interface.slip_serial.recv_msg = Mock(return_value=bytes.fromhex('00 04 01 02 03 04 00 00')) - - # Act - message = self.interface._read_message() - - # Assert - self.assertEqual(message, bytes.fromhex('01 02 03 04')) - - def test_protocol_error_is_handled_correctly(self): - # Arrange - self.interface.slip_serial.recv_msg = Mock(side_effect=sliplib.ProtocolError) - - # Act and assert - with self.assertRaisesRegex(InterfaceError, 'SLIP protocol error'): - self.interface._read_message() - - def test_invalid_message_length_is_handled_correctly(self): - # Arrange - self.interface.slip_serial.recv_msg = Mock(return_value=bytes.fromhex('00')) - - # Act and assert - with self.assertRaisesRegex(InterfaceError, 'Invalid response message'): - self.interface._read_message() - - def test_message_length_mismatch_is_handled_correctly(self): - # Arrange - self.interface.slip_serial.recv_msg = Mock(return_value=bytes.fromhex('00 05 01 02 03 04 00 00')) - - # Act and assert - with self.assertRaisesRegex(InterfaceError, 'Response message length mismatch'): - self.interface._read_message() - - def test_empty_message_is_handled_correctly(self): - # Arrange - self.interface.slip_serial.recv_msg = Mock(return_value=bytes.fromhex('00 00 00 00')) - - # Act and assert - with self.assertRaisesRegex(InterfaceError, 'Empty response message'): - self.interface._read_message() - -class Interface1WriteMessageTestCase(unittest.TestCase): - def setUp(self): - self.serial = Mock() - - self.interface = Interface1(self.serial) - - self.interface.slip_serial = Mock() - - def test(self): - # Act - self.interface._write_message(bytes.fromhex('01 02 03 04')) - - # Assert - self.interface.slip_serial.send_msg.assert_called_with(bytes.fromhex('00 04 01 02 03 04 00 00')) - -if __name__ == '__main__': - unittest.main() diff --git a/pycoax/tests/test_protocol.py b/pycoax/tests/test_protocol.py index fdd0c37..ee2088a 100644 --- a/pycoax/tests/test_protocol.py +++ b/pycoax/tests/test_protocol.py @@ -151,7 +151,7 @@ class ExecuteReadCommandTestCase(unittest.TestCase): # Arrange command_word = pack_command_word(Command.READ_TERMINAL_ID) - self.interface.execute = Mock(return_value=[0b0000000010]) + self.interface.transmit_receive = Mock(return_value=[0b0000000010]) # Act and assert self.assertEqual(_execute_read_command(self.interface, command_word), bytes.fromhex('00')) @@ -160,7 +160,7 @@ class ExecuteReadCommandTestCase(unittest.TestCase): # Arrange command_word = pack_command_word(Command.POLL) - self.interface.execute = Mock(return_value=[0b0000000000]) + self.interface.transmit_receive = Mock(return_value=[0b0000000000]) # Act and assert self.assertEqual(_execute_read_command(self.interface, command_word, allow_trta_response=True, trta_value='TRTA'), 'TRTA') @@ -169,7 +169,7 @@ class ExecuteReadCommandTestCase(unittest.TestCase): # Arrange command_word = pack_command_word(Command.POLL) - self.interface.execute = Mock(return_value=[0b1111111110]) + self.interface.transmit_receive = Mock(return_value=[0b1111111110]) # Act and assert self.assertEqual(_execute_read_command(self.interface, command_word, unpack=False), [0b1111111110]) @@ -178,23 +178,23 @@ class ExecuteReadCommandTestCase(unittest.TestCase): # Arrange command_word = pack_command_word(Command.READ_TERMINAL_ID) - self.interface.execute = Mock(return_value=[]) + self.interface.transmit_receive = Mock(return_value=[]) # Act and assert with self.assertRaisesRegex(ProtocolError, 'Expected 1 word READ_TERMINAL_ID response'): _execute_read_command(self.interface, command_word) - def test_timeout_is_passed_to_interface(self): + def test_receive_timeout_is_passed_to_interface(self): # Arrange command_word = pack_command_word(Command.READ_TERMINAL_ID) - self.interface.execute = Mock(return_value=[0b0000000010]) + self.interface.transmit_receive = Mock(return_value=[0b0000000010]) # Act - _execute_read_command(self.interface, command_word, timeout=10) + _execute_read_command(self.interface, command_word, receive_timeout=10) # Assert - self.assertEqual(self.interface.execute.call_args[1].get('timeout'), 10) + self.assertEqual(self.interface.transmit_receive.call_args[1].get('receive_timeout'), 10) class ExecuteWriteCommandTestCase(unittest.TestCase): def setUp(self): @@ -204,7 +204,7 @@ class ExecuteWriteCommandTestCase(unittest.TestCase): # Arrange command_word = pack_command_word(Command.WRITE_DATA) - self.interface.execute = Mock(return_value=[0b0000000000]) + self.interface.transmit_receive = Mock(return_value=[0b0000000000]) # Act and assert _execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef')) @@ -213,7 +213,7 @@ class ExecuteWriteCommandTestCase(unittest.TestCase): # Arrange command_word = pack_command_word(Command.WRITE_DATA) - self.interface.execute = Mock(return_value=[]) + self.interface.transmit_receive = Mock(return_value=[]) # Act and assert with self.assertRaisesRegex(ProtocolError, 'Expected 1 word WRITE_DATA response'): @@ -223,23 +223,23 @@ class ExecuteWriteCommandTestCase(unittest.TestCase): # Arrange command_word = pack_command_word(Command.WRITE_DATA) - self.interface.execute = Mock(return_value=[0b0000000010]) + self.interface.transmit_receive = Mock(return_value=[0b0000000010]) # Act and assert with self.assertRaisesRegex(ProtocolError, 'Expected TR/TA response'): _execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef')) - def test_timeout_is_passed_to_interface(self): + def test_receive_timeout_is_passed_to_interface(self): # Arrange command_word = pack_command_word(Command.WRITE_DATA) - self.interface.execute = Mock(return_value=[0b0000000000]) + self.interface.transmit_receive = Mock(return_value=[0b0000000000]) # Assert - _execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef'), timeout=10) + _execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef'), receive_timeout=10) # Assert - self.assertEqual(self.interface.execute.call_args[1].get('timeout'), 10) + self.assertEqual(self.interface.transmit_receive.call_args[1].get('receive_timeout'), 10) if __name__ == '__main__': unittest.main() diff --git a/pycoax/tests/test_serial_interface.py b/pycoax/tests/test_serial_interface.py new file mode 100644 index 0000000..56ded69 --- /dev/null +++ b/pycoax/tests/test_serial_interface.py @@ -0,0 +1,124 @@ +import unittest +from unittest.mock import Mock +import sliplib + +import context + +from coax import SerialInterface, InterfaceError, ReceiveError, ReceiveTimeout + +class SerialInterfaceResetTestCase(unittest.TestCase): + def setUp(self): + self.serial = Mock() + + self.serial.timeout = None + + self.interface = SerialInterface(self.serial) + + self.interface._write_message = Mock() + self.interface._read_message = Mock(return_value=bytes.fromhex('01 01 02 03')) + + def test_message_is_sent(self): + # Act + self.interface.reset() + + # Assert + self.interface._write_message.assert_called_with(bytes.fromhex('01')) + + def test_version_is_formatted_correctly(self): + self.assertEqual(self.interface.reset(), '1.2.3') + + def test_timeout_is_restored_after_reset(self): + # Arrange + self.serial.timeout = 123 + + # Act + self.interface.reset() + + # Assert + self.assertEqual(self.serial.timeout, 123) + + def test_invalid_message_length_is_handled_correctly(self): + # Arrange + self.interface._read_message = Mock(return_value=bytes.fromhex('01 01')) + + # Act and assert + with self.assertRaisesRegex(InterfaceError, 'Invalid reset response'): + self.interface.reset() + + def test_error_is_handled_correctly(self): + # Arrange + self.interface._read_message = Mock(return_value=bytes.fromhex('02 01')) + + # Act and assert + with self.assertRaisesRegex(InterfaceError, 'Invalid request message'): + self.interface.reset() + +# TODO... + +class SerialInterfaceReadMessageTestCase(unittest.TestCase): + def setUp(self): + self.serial = Mock() + + self.interface = SerialInterface(self.serial) + + self.interface.slip_serial = Mock() + + def test(self): + # Arrange + self.interface.slip_serial.recv_msg = Mock(return_value=bytes.fromhex('00 04 01 02 03 04 00 00')) + + # Act + message = self.interface._read_message() + + # Assert + self.assertEqual(message, bytes.fromhex('01 02 03 04')) + + def test_protocol_error_is_handled_correctly(self): + # Arrange + self.interface.slip_serial.recv_msg = Mock(side_effect=sliplib.ProtocolError) + + # Act and assert + with self.assertRaisesRegex(InterfaceError, 'SLIP protocol error'): + self.interface._read_message() + + def test_invalid_message_length_is_handled_correctly(self): + # Arrange + self.interface.slip_serial.recv_msg = Mock(return_value=bytes.fromhex('00')) + + # Act and assert + with self.assertRaisesRegex(InterfaceError, 'Invalid response message'): + self.interface._read_message() + + def test_message_length_mismatch_is_handled_correctly(self): + # Arrange + self.interface.slip_serial.recv_msg = Mock(return_value=bytes.fromhex('00 05 01 02 03 04 00 00')) + + # Act and assert + with self.assertRaisesRegex(InterfaceError, 'Response message length mismatch'): + self.interface._read_message() + + def test_empty_message_is_handled_correctly(self): + # Arrange + self.interface.slip_serial.recv_msg = Mock(return_value=bytes.fromhex('00 00 00 00')) + + # Act and assert + with self.assertRaisesRegex(InterfaceError, 'Empty response message'): + self.interface._read_message() + +class SerialInterfaceWriteMessageTestCase(unittest.TestCase): + def setUp(self): + self.serial = Mock() + + self.interface = SerialInterface(self.serial) + + self.interface.slip_serial = Mock() + + def test(self): + # Act + self.interface._write_message(bytes.fromhex('01 02 03 04')) + + # Assert + self.interface.slip_serial.send_msg.assert_called_with(bytes.fromhex('00 04 01 02 03 04 00 00')) + +if __name__ == '__main__': + unittest.main()