Merge branch 'master' into i2

This commit is contained in:
Andrew Kay
2020-03-28 10:49:33 -05:00
31 changed files with 728 additions and 1423 deletions

View File

@@ -1,8 +1,8 @@
# coax-interface
# coax
> Hello mainframe!
coax-interface is a set of tools for interfacing with [IBM 3270](https://en.wikipedia.org/wiki/IBM_3270) type terminals.
Tools for interfacing with [IBM 3270](https://en.wikipedia.org/wiki/IBM_3270) type terminals.
## Contents

1
interface1/firmware/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
.pio

View File

@@ -1,305 +0,0 @@
// Copyright (c) 2019, Andrew Kay
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#include <Arduino.h>
#include "CoaxTransceiver.h"
// Arduino Mega pins...
//
// Arduino Arduino Port Label DP8340N DP8341N
// Pin and Mask Pin Pin
// ---------|--------------|-----------------|---------|---------
// 7 | | EVEN/ODD PARITY | 18 |
// 6 | PH3 0x08 | PARITY CONTROL | 19 |
// 5 | | AUTO RESPONSE | 21 |
// 4 | PG5 0x20 | REGISTERS FULL | 22 |
// 3 | PE5 0x20 | REGISTER LOAD | 23 |
// ---------.--------------.-----------------.---------.---------
// 2 | | DATA CONTROL | | 5
// 14 | PJ1 0x02 | ERROR | | 8
// 15 | PJ0 0x01 | DATA AVAILABLE | | 10
// 16 | PH1 0x02 | REGISTER READ | | 9
// 17 | PH0 0x01 | OUTPUT CONTROL | | 11
// 18* | PD3 0x08 | RECEIVER ACTIVE | | 7
// 19 | PD2 0x04 | OUTPUT ENABLE | | 13
// ---------.--------------.-----------------.---------.---------
// 22 | PA0 | D11 | 1 | 23
// 23 | PA1 | D10 | 2 | 22
// 24 | PA2 | D9 | 3 | 21
// 25 | PA3 | D8 | 4 | 20
// 26 | PA4 | D7 | 5 | 19
// 27 | PA5 | D6 | 6 | 18
// 28 | PA6 | D5 | 7 | 17
// 29 | PA7 | D4 | 8 | 16
// 36 | PC1 | D2 | 10 | 14
// 37 | PC0 | D3 | 9 | 15
//
// * - Interrupt capable pin
#define TX_EVEN_ODD_PARITY_PIN 7
#define TX_PARITY_CONTROL_PIN 6
#define TX_AUTO_RESPONSE_PIN 5
#define TX_REGISTERS_FULL_PIN 4
#define TX_REGISTER_LOAD_PIN 3
#define RX_DATA_CONTROL_PIN 2
#define RX_ERROR_PIN 14
#define RX_DATA_AVAILABLE_PIN 15
#define RX_REGISTER_READ_PIN 16
#define RX_OUTPUT_CONTROL_PIN 17
#define RX_ACTIVE_PIN 18
#define RX_OUTPUT_ENABLE_PIN 19
#define RX_STATE_DISABLED 0
#define RX_STATE_WAITING 1
#define RX_STATE_RECEIVING 2
#define RX_STATE_RECEIVED 3
static volatile uint8_t CoaxTransceiver::rxState;
static volatile uint16_t *CoaxTransceiver::rxBuffer;
static volatile size_t CoaxTransceiver::rxBufferSize;
static volatile int /* ssize_t */ CoaxTransceiver::rxBufferCount;
#define NOP __asm__("nop\n\t")
static void CoaxTransceiver::setup() {
// Configure data bus.
dataBusSetup();
// Configure receiver (DP8341N).
rxSetup();
// Configure transmitter (DP8340N).
txSetup();
}
static int /* ssize_t */ CoaxTransceiver::transmitReceive(uint16_t commandWord, uint8_t *dataBuffer, size_t dataBufferCount, uint16_t *receiveBuffer, size_t receiveBufferSize, uint16_t timeout) {
int returnValue = transmit(commandWord, dataBuffer, dataBufferCount);
if (returnValue < 0) {
return returnValue;
}
return receive(receiveBuffer, receiveBufferSize, timeout);
}
static void CoaxTransceiver::dataBusSetup() {
DDRA = B00000000;
DDRC = B00000000;
}
static void CoaxTransceiver::rxSetup() {
// Data Control - Amplifier Inputs
pinMode(RX_DATA_CONTROL_PIN, OUTPUT);
digitalWrite(RX_DATA_CONTROL_PIN, HIGH);
// Register Read
pinMode(RX_REGISTER_READ_PIN, OUTPUT);
digitalWrite(RX_REGISTER_READ_PIN, HIGH);
// Output Control - Data
pinMode(RX_OUTPUT_CONTROL_PIN, OUTPUT);
digitalWrite(RX_OUTPUT_CONTROL_PIN, HIGH);
// Output Enable - Active
pinMode(RX_OUTPUT_ENABLE_PIN, OUTPUT);
digitalWrite(RX_OUTPUT_ENABLE_PIN, HIGH);
// Receiver Active
pinMode(RX_ACTIVE_PIN, INPUT);
attachInterrupt(digitalPinToInterrupt(RX_ACTIVE_PIN), rxActiveInterrupt, RISING);
// Data Available
pinMode(RX_DATA_AVAILABLE_PIN, INPUT);
// Error
pinMode(RX_ERROR_PIN, INPUT);
}
static void CoaxTransceiver::txSetup() {
// Register Load
pinMode(TX_REGISTER_LOAD_PIN, OUTPUT);
digitalWrite(TX_REGISTER_LOAD_PIN, HIGH);
// Auto Response - Data
pinMode(TX_AUTO_RESPONSE_PIN, OUTPUT);
digitalWrite(TX_AUTO_RESPONSE_PIN, HIGH);
// Even/Odd Parity - Even
pinMode(TX_EVEN_ODD_PARITY_PIN, OUTPUT);
digitalWrite(TX_EVEN_ODD_PARITY_PIN, HIGH);
// Parity Control - Data
pinMode(TX_PARITY_CONTROL_PIN, OUTPUT);
digitalWrite(TX_PARITY_CONTROL_PIN, HIGH);
// Registers Full
pinMode(TX_REGISTERS_FULL_PIN, INPUT);
}
static int /* ssize_t */ CoaxTransceiver::transmit(uint16_t commandWord, uint8_t *dataBuffer, size_t dataCount) {
// Ensure receiver is inactive.
if (rxState != RX_STATE_DISABLED) {
return ERROR_TX_RECEIVER_ACTIVE;
}
if (/* RECEIVER ACTIVE */ (PIND & 0x8) == 0x8) {
return ERROR_TX_RECEIVER_ACTIVE;
}
// Disable interrupts.
noInterrupts();
// Disable receiver output.
PORTD &= ~0x04; // RX Output Enable - Low (Disable)
// Configure data bus for output.
DDRA = B11111111;
DDRC = B00000011;
// Send command word - we make an assumption here that TX_REGISTERS_FULL is not set.
PORTC = (PINC & 0xfc) | ((commandWord >> 8) & 0x3);
PORTA = commandWord & 0xff;
PORTE &= ~0x20; // TX Register Load - Low (Load)
PORTE |= 0x20; // TX Register Load - High
// Send data - offload parity computation to DP8340.
if (dataCount > 0) {
// Enable transmitter parity calculation.
PORTH &= ~0x08; // TX Parity Control - Low
for (int index = 0; index < dataCount; index++) {
// Wait while TX Registers Full is high.
while ( (PING & 0x20) == 0x20) {
NOP;
}
uint8_t data = dataBuffer[index];
PORTC = (PINC & 0xfc) | ((data >> 6) & 0x3);
PORTA = (data << 2);
PORTE &= ~0x20; // TX Register Load - Low (Load)
PORTE |= 0x20; // TX Register Load - High
}
// Disable transmitter parity calculation.
PORTH |= 0x08; // TX Parity Control - High
}
// Configure data bus for input.
DDRA = B00000000;
DDRC = B00000000;
// Enable receiver output.
PORTD |= 0x04; // RX Output Enable - High (Enable)
// Enable interrupts.
interrupts();
return dataCount;
}
static int /* ssize_t */ CoaxTransceiver::receive(uint16_t *buffer, size_t bufferSize, uint16_t timeout) {
rxBuffer = buffer;
rxBufferSize = bufferSize;
rxState = RX_STATE_WAITING;
if (timeout > 0) {
unsigned long startTime = millis();
while (rxState == RX_STATE_WAITING) {
// https://www.forward.com.au/pfod/ArduinoProgramming/TimingDelaysInArduino.html#unsigned
if ((millis() - startTime) > timeout) {
rxState = RX_STATE_DISABLED;
return ERROR_RX_TIMEOUT;
}
}
}
while (rxState != RX_STATE_RECEIVED) {
NOP;
}
rxState = RX_STATE_DISABLED;
return rxBufferCount;
}
static void CoaxTransceiver::rxActiveInterrupt() {
uint16_t data;
uint8_t mask;
if (rxState == RX_STATE_DISABLED) {
return;
}
rxState = RX_STATE_RECEIVING;
rxBufferCount = 0;
do {
while (/* ERROR or DATA AVAILABLE */ (PINJ & 0x03) == 0) {
NOP;
}
if (/* ERROR */ (PINJ & 0x02) == 0x02) {
mask = 0x02;
PORTH &= ~0x01; // Output Control - Low (Error)
PORTH &= ~0x02; // Register Read - Low
// Read and mark as error.
data = (((PINC & 0x3) | 0x80) << 8) | PINA;
PORTH |= 0x02; // Register Read - High
PORTH |= 0x01; // Output Control - High (Data)
} else if (/* DATA AVAILABLE */ (PINJ & 0x01) == 0x01) {
mask = 0x01;
PORTH &= ~0x02; // Register Read - Low
// Read.
data = ((PINC & 0x3) << 8) | PINA;
PORTH |= 0x02; // Register Read - High
}
if (rxBufferCount >= rxBufferSize) {
rxBufferCount = ERROR_RX_OVERFLOW;
goto EXIT;
}
rxBuffer[rxBufferCount++] = data;
while ((PINJ & mask) == mask) {
NOP;
}
} while (/* RECEIVER ACTIVE */ (PIND & 0x8) == 0x8);
EXIT:
rxState = RX_STATE_RECEIVED;
}

View File

@@ -1,304 +0,0 @@
// Copyright (c) 2019, Andrew Kay
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#include <Arduino.h>
#include "CoaxTransceiver.h"
#define COMMAND_RESET 0x01
#define COMMAND_EXECUTE 0x02
#define COMMAND_EXECUTE_OFFLOAD 0x03
#define ERROR_INVALID_MESSAGE 1
#define ERROR_UNKNOWN_COMMAND 2
#define ERROR_UNKNOWN_OFFLOAD_COMMAND 3
#define UNPACK_DATA_WORD(w) (uint8_t) ((w >> 2) & 0xff)
void handleResetCommand(uint8_t *buffer, int bufferCount) {
uint8_t response[] = { 0x01, 0x00, 0x00, 0x01 };
sendMessage(response, 4);
}
void handleExecuteCommand(uint8_t *buffer, int bufferCount) {
if (bufferCount < 6) {
sendErrorMessage(ERROR_INVALID_MESSAGE);
return;
}
uint16_t commandWord = (buffer[0] << 8) | buffer[1];
uint16_t receiveCount = (buffer[2] << 8) | buffer[3];
uint16_t timeout = (buffer[4] << 8) | buffer[5];
uint8_t *dataBuffer = buffer + 6;
uint16_t dataBufferCount = bufferCount - 6;
uint16_t *receiveBuffer = (uint16_t *) (buffer + 2);
bufferCount = CoaxTransceiver::transmitReceive(commandWord, dataBuffer, dataBufferCount, receiveBuffer, receiveCount, timeout);
if (bufferCount < 0) {
sendErrorMessage(100 + ((-1) * bufferCount));
return;
}
// Send the response message.
buffer[1] = 0x01;
bufferCount = 1 + (bufferCount * 2);
sendMessage(buffer + 1, bufferCount);
}
void handleExecuteOffloadCommand(uint8_t *buffer, int bufferCount) {
if (bufferCount < 1) {
sendErrorMessage(ERROR_INVALID_MESSAGE);
return;
}
uint8_t command = buffer[0];
if (command == 0x01) {
handleOffloadLoadAddressCounter(buffer + 1, bufferCount - 1);
} else if (command == 0x02) {
handleOffloadWrite(buffer + 1, bufferCount - 1);
} else {
sendErrorMessage(ERROR_UNKNOWN_OFFLOAD_COMMAND);
}
}
void handleOffloadLoadAddressCounter(uint8_t *buffer, int bufferCount) {
uint16_t response;
if (bufferCount < 2) {
sendErrorMessage(ERROR_INVALID_MESSAGE);
return;
}
uint8_t hi = buffer[0];
uint8_t lo = buffer[1];
// TODO: error handling...
CoaxTransceiver::transmitReceive(/* LOAD_ADDRESS_COUNTER_HI */ 0x11, &hi, 1, &response, 1, 0);
CoaxTransceiver::transmitReceive(/* LOAD_ADDRESS_COUNTER_LO */ 0x51, &lo, 1, &response, 1, 0);
// Send the response message.
uint8_t message[] = { 0x01 };
sendMessage(message, 1);
}
void handleOffloadWrite(uint8_t *buffer, int bufferCount) {
uint16_t response;
if (bufferCount < 5) {
sendErrorMessage(ERROR_INVALID_MESSAGE);
return;
}
uint8_t addressHi = buffer[0];
uint8_t addressLo = buffer[1];
bool restoreOriginalAddress = buffer[2];
uint16_t repeatCount = (buffer[3] << 8) | buffer[4];
uint8_t *dataBuffer = buffer + 5;
uint16_t dataBufferCount = bufferCount - 5;
if (dataBufferCount < 1) {
sendErrorMessage(ERROR_INVALID_MESSAGE);
return;
}
// Repeat the provided data if applicable.
if (repeatCount > 0) {
uint16_t dataBufferIndex = dataBufferCount;
for (int repeatIndex = 0; repeatIndex < repeatCount; repeatIndex++) {
for (int index = 0; index < dataBufferCount; index++) {
dataBuffer[dataBufferIndex++] = dataBuffer[index];
}
}
dataBufferCount *= (repeatCount + 1);
}
// Store original address if applicable.
uint8_t originalAddressHi;
uint8_t originalAddressLo;
if (restoreOriginalAddress) {
CoaxTransceiver::transmitReceive(/* READ_ADDRESS_COUNTER_HI */ 0x15, NULL, 0, &response, 1, 0);
originalAddressHi = UNPACK_DATA_WORD(response);
CoaxTransceiver::transmitReceive(/* READ_ADDRESS_COUNTER_LO */ 0x55, NULL, 0, &response, 1, 0);
originalAddressLo = UNPACK_DATA_WORD(response);
}
// Move to start address if applicable.
if (!(addressHi == 0xff && addressLo == 0xff)) {
CoaxTransceiver::transmitReceive(/* LOAD_ADDRESS_COUNTER_HI */ 0x11, &addressHi, 1, &response, 1, 0);
CoaxTransceiver::transmitReceive(/* LOAD_ADDRESS_COUNTER_LO */ 0x51, &addressLo, 1, &response, 1, 0);
}
// Write buffer.
CoaxTransceiver::transmitReceive(/* WRITE_DATA */ 0x31, dataBuffer, dataBufferCount, &response, 1, 0);
// Restore original address if applicable.
if (restoreOriginalAddress) {
CoaxTransceiver::transmitReceive(/* LOAD_ADDRESS_COUNTER_HI */ 0x11, &originalAddressHi, 1, &response, 1, 0);
CoaxTransceiver::transmitReceive(/* LOAD_ADDRESS_COUNTER_LO */ 0x51, &originalAddressLo, 1, &response, 1, 0);
}
// Send the response message.
uint8_t message[] = { 0x01 };
sendMessage(message, 1);
}
void handleMessage(uint8_t *buffer, int bufferCount) {
if (bufferCount < 1) {
sendErrorMessage(ERROR_INVALID_MESSAGE);
return;
}
uint8_t command = buffer[0];
if (command == COMMAND_RESET) {
handleResetCommand(buffer + 1, bufferCount - 1);
} else if (command == COMMAND_EXECUTE) {
handleExecuteCommand(buffer + 1, bufferCount - 1);
} else if (command == COMMAND_EXECUTE_OFFLOAD) {
handleExecuteOffloadCommand(buffer + 1, bufferCount - 1);
} else {
sendErrorMessage(ERROR_UNKNOWN_COMMAND);
}
}
#define FRAME_END 0xc0
#define FRAME_ESCAPE 0xdb
#define FRAME_ESCAPE_END 0xdc
#define FRAME_ESCAPE_ESCAPE 0xdd
enum {
WAIT_START,
DATA,
ESCAPE
} frameState;
#define FRAME_BUFFER_SIZE (25 * 80) + 32
uint8_t frameBuffer[FRAME_BUFFER_SIZE];
int frameBufferCount = 0;
void handleFrame(uint8_t *buffer, int bufferCount) {
if (bufferCount < 4) {
sendErrorMessage(ERROR_INVALID_MESSAGE);
return;
}
int count = (buffer[0] << 8) | buffer[1];
if (bufferCount - 4 != count) {
sendErrorMessage(ERROR_INVALID_MESSAGE);
return;
}
handleMessage(buffer + 2, count);
}
void sendMessage(uint8_t *buffer, int bufferCount) {
Serial.write((char) FRAME_END);
// Write the length.
Serial.write((char) bufferCount >> 8);
Serial.write((char) bufferCount);
for (int index = 0; index < bufferCount; index++) {
if (buffer[index] == FRAME_END) {
Serial.write((char) FRAME_ESCAPE);
Serial.write((char) FRAME_ESCAPE_END);
} else if (buffer[index] == FRAME_ESCAPE) {
Serial.write((char) FRAME_ESCAPE);
Serial.write((char) FRAME_ESCAPE_ESCAPE);
} else {
Serial.write((char) buffer[index]);
}
}
// Write the placeholder for checksum.
Serial.write((char) 0x00);
Serial.write((char) 0x00);
Serial.write((char) FRAME_END);
Serial.flush();
}
void sendErrorMessage(uint8_t code) {
uint8_t message[] = { 0x02, code };
sendMessage(message, 2);
}
void setup() {
// Configure serial port and state machine.
Serial.begin(115200);
frameState = WAIT_START;
while (Serial.available() > 0) {
Serial.read();
}
// Configure the transceiver.
CoaxTransceiver::setup();
}
void loop() {
if (Serial.available() > 0) {
uint8_t byte = Serial.read();
if (frameState == WAIT_START) {
if (byte == FRAME_END) {
frameState = DATA;
}
} else if (frameState == DATA) {
if (byte == FRAME_END) {
if (frameBufferCount > 0) {
handleFrame(frameBuffer, frameBufferCount);
}
frameBufferCount = 0;
} else if (byte == FRAME_ESCAPE) {
frameState = ESCAPE;
} else {
// TODO: overflow...
frameBuffer[frameBufferCount++] = byte;
}
} else if (frameState == ESCAPE) {
if (byte == FRAME_ESCAPE_END) {
// TODO: overflow...
frameBuffer[frameBufferCount++] = FRAME_END;
} else if (byte == FRAME_ESCAPE_ESCAPE) {
// TODO: overflow...
frameBuffer[frameBufferCount++] = FRAME_ESCAPE;
}
frameState = DATA;
}
}
}

View File

@@ -14,22 +14,21 @@
#pragma once
#include <Arduino.h>
#define ERROR_TX_RECEIVER_ACTIVE -1
#define ERROR_RX_TIMEOUT -2
#define ERROR_RX_OVERFLOW -3
#define ERROR_RX_RECEIVER -4
class CoaxTransceiver {
public:
public:
static void setup();
static int /* ssize_t */ transmitReceive(uint16_t commandWord, uint8_t *dataBuffer, size_t dataBufferCount, uint16_t *receiveBuffer, size_t receiveBufferSize, uint16_t timeout);
private:
static int /* ssize_t */ transmitReceive(uint16_t *transmitBuffer, size_t transmitBufferCount, uint16_t *receiveBuffer, size_t receiveBufferSize, uint16_t receiveTimeout);
private:
static void dataBusSetup();
static void rxSetup();
static void txSetup();
static int /* ssize_t */ transmit(uint16_t commandWord, uint8_t *dataBuffer, size_t dataCount);
static int /* ssize_t */ transmit(uint16_t *buffer, size_t bufferCount);
static int /* ssize_t */ receive(uint16_t *buffer, size_t bufferSize, uint16_t timeout);
static void rxActiveInterrupt();

View File

@@ -0,0 +1,6 @@
[env]
framework = arduino
[env:megaatmega2560]
platform = atmelavr
board = megaatmega2560

View File

@@ -0,0 +1,311 @@
// Copyright (c) 2019, Andrew Kay
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#include <Arduino.h>
#include "CoaxTransceiver.h"
// Arduino Mega pins...
//
// Arduino Arduino Port Label DP8340N DP8341N
// Pin and Mask Pin Pin
// ---------|--------------|-----------------|---------|---------
// 7 | | EVEN/ODD PARITY | 18 |
// 6 | PH3 0x08 | PARITY CONTROL | 19 |
// 5 | | AUTO RESPONSE | 21 |
// 4 | PG5 0x20 | REGISTERS FULL | 22 |
// 3 | PE5 0x20 | REGISTER LOAD | 23 |
// ---------.--------------.-----------------.---------.---------
// 2 | | DATA CONTROL | | 5
// 14 | PJ1 0x02 | ERROR | | 8
// 15 | PJ0 0x01 | DATA AVAILABLE | | 10
// 16 | PH1 0x02 | REGISTER READ | | 9
// 17 | PH0 0x01 | OUTPUT CONTROL | | 11
// 18* | PD3 0x08 | RECEIVER ACTIVE | | 7
// 19 | PD2 0x04 | OUTPUT ENABLE | | 13
// ---------.--------------.-----------------.---------.---------
// 22 | PA0 | D11 | 1 | 23
// 23 | PA1 | D10 | 2 | 22
// 24 | PA2 | D9 | 3 | 21
// 25 | PA3 | D8 | 4 | 20
// 26 | PA4 | D7 | 5 | 19
// 27 | PA5 | D6 | 6 | 18
// 28 | PA6 | D5 | 7 | 17
// 29 | PA7 | D4 | 8 | 16
// 36 | PC1 | D2 | 10 | 14
// 37 | PC0 | D3 | 9 | 15
//
// * - Interrupt capable pin
#define TX_EVEN_ODD_PARITY_PIN 7
#define TX_PARITY_CONTROL_PIN 6
#define TX_AUTO_RESPONSE_PIN 5
#define TX_REGISTERS_FULL_PIN 4
#define TX_REGISTER_LOAD_PIN 3
#define RX_DATA_CONTROL_PIN 2
#define RX_ERROR_PIN 14
#define RX_DATA_AVAILABLE_PIN 15
#define RX_REGISTER_READ_PIN 16
#define RX_OUTPUT_CONTROL_PIN 17
#define RX_ACTIVE_PIN 18
#define RX_OUTPUT_ENABLE_PIN 19
#define RX_STATE_DISABLED 0
#define RX_STATE_WAITING 1
#define RX_STATE_RECEIVING 2
#define RX_STATE_RECEIVED 3
volatile uint8_t CoaxTransceiver::rxState;
volatile uint16_t *CoaxTransceiver::rxBuffer;
volatile size_t CoaxTransceiver::rxBufferSize;
volatile int /* ssize_t */ CoaxTransceiver::rxBufferCount;
#define NOP __asm__("nop\n\t")
void CoaxTransceiver::setup()
{
// Configure data bus.
dataBusSetup();
// Configure receiver (DP8341N).
rxSetup();
// Configure transmitter (DP8340N).
txSetup();
}
int /* ssize_t */ CoaxTransceiver::transmitReceive(uint16_t *transmitBuffer, size_t transmitBufferCount, uint16_t *receiveBuffer, size_t receiveBufferSize, uint16_t receiveTimeout)
{
int returnValue = transmit(transmitBuffer, transmitBufferCount);
if (returnValue < 0) {
return returnValue;
}
return receive(receiveBuffer, receiveBufferSize, receiveTimeout);
}
void CoaxTransceiver::dataBusSetup()
{
DDRA = B00000000;
DDRC = B00000000;
}
void CoaxTransceiver::rxSetup()
{
// Data Control - Amplifier Inputs
pinMode(RX_DATA_CONTROL_PIN, OUTPUT);
digitalWrite(RX_DATA_CONTROL_PIN, HIGH);
// Register Read
pinMode(RX_REGISTER_READ_PIN, OUTPUT);
digitalWrite(RX_REGISTER_READ_PIN, HIGH);
// Output Control - Data
pinMode(RX_OUTPUT_CONTROL_PIN, OUTPUT);
digitalWrite(RX_OUTPUT_CONTROL_PIN, HIGH);
// Output Enable - Active
pinMode(RX_OUTPUT_ENABLE_PIN, OUTPUT);
digitalWrite(RX_OUTPUT_ENABLE_PIN, HIGH);
// Receiver Active
pinMode(RX_ACTIVE_PIN, INPUT);
attachInterrupt(digitalPinToInterrupt(RX_ACTIVE_PIN), rxActiveInterrupt, RISING);
// Data Available
pinMode(RX_DATA_AVAILABLE_PIN, INPUT);
// Error
pinMode(RX_ERROR_PIN, INPUT);
}
void CoaxTransceiver::txSetup()
{
// Register Load
pinMode(TX_REGISTER_LOAD_PIN, OUTPUT);
digitalWrite(TX_REGISTER_LOAD_PIN, HIGH);
// Auto Response - Data
pinMode(TX_AUTO_RESPONSE_PIN, OUTPUT);
digitalWrite(TX_AUTO_RESPONSE_PIN, HIGH);
// Even/Odd Parity - Even
pinMode(TX_EVEN_ODD_PARITY_PIN, OUTPUT);
digitalWrite(TX_EVEN_ODD_PARITY_PIN, HIGH);
// Parity Control - Data
pinMode(TX_PARITY_CONTROL_PIN, OUTPUT);
digitalWrite(TX_PARITY_CONTROL_PIN, HIGH);
// Registers Full
pinMode(TX_REGISTERS_FULL_PIN, INPUT);
}
int /* ssize_t */ CoaxTransceiver::transmit(uint16_t *buffer, size_t bufferCount)
{
// Ensure receiver is inactive.
if (rxState != RX_STATE_DISABLED) {
return ERROR_TX_RECEIVER_ACTIVE;
}
if (/* RECEIVER ACTIVE */ (PIND & 0x8) == 0x8) {
return ERROR_TX_RECEIVER_ACTIVE;
}
// Disable interrupts.
noInterrupts();
// Disable receiver output.
PORTD &= ~0x04; // RX Output Enable - Low (Disable)
// Configure data bus for output.
DDRA = B11111111;
DDRC = B00000011;
// Transmit.
for (int index = 0; index < bufferCount; index++) {
uint16_t data = buffer[index];
// Wait while TX Registers Full is high.
while ((PING & 0x20) == 0x20) {
NOP;
}
PORTC = (PINC & 0xfc) | ((data >> 8) & 0x3);
PORTA = data & 0xff;
PORTE &= ~0x20; // TX Register Load - Low (Load)
PORTE |= 0x20; // TX Register Load - High
}
// Configure data bus for input.
DDRA = B00000000;
DDRC = B00000000;
// Enable receiver output.
PORTD |= 0x04; // RX Output Enable - High (Enable)
// Enable interrupts.
interrupts();
return bufferCount;
}
int /* ssize_t */ CoaxTransceiver::receive(uint16_t *buffer, size_t bufferSize, uint16_t timeout)
{
rxBuffer = buffer;
rxBufferSize = bufferSize;
rxState = RX_STATE_WAITING;
if (timeout > 0) {
unsigned long startTime = millis();
while (rxState == RX_STATE_WAITING) {
// https://www.forward.com.au/pfod/ArduinoProgramming/TimingDelaysInArduino.html#unsigned
if ((millis() - startTime) > timeout) {
rxState = RX_STATE_DISABLED;
return ERROR_RX_TIMEOUT;
}
}
}
while (rxState != RX_STATE_RECEIVED) {
NOP;
}
uint16_t count = rxBufferCount;
rxState = RX_STATE_DISABLED;
if (count < 0) {
return count;
}
// Check for receiver errors.
for (int index = 0; index < count; index++) {
if (buffer[index] & 0x8000) {
return ERROR_RX_RECEIVER;
}
}
return count;
}
void CoaxTransceiver::rxActiveInterrupt()
{
uint16_t data;
uint8_t mask;
if (rxState == RX_STATE_DISABLED) {
return;
}
rxState = RX_STATE_RECEIVING;
rxBufferCount = 0;
do {
while (/* ERROR or DATA AVAILABLE */ (PINJ & 0x03) == 0) {
NOP;
}
if (/* ERROR */ (PINJ & 0x02) == 0x02) {
mask = 0x02;
PORTH &= ~0x01; // Output Control - Low (Error)
PORTH &= ~0x02; // Register Read - Low
// Read and mark as error.
data = (((PINC & 0x3) | 0x80) << 8) | PINA;
PORTH |= 0x02; // Register Read - High
PORTH |= 0x01; // Output Control - High (Data)
} else if (/* DATA AVAILABLE */ (PINJ & 0x01) == 0x01) {
mask = 0x01;
PORTH &= ~0x02; // Register Read - Low
// Read.
data = ((PINC & 0x3) << 8) | PINA;
PORTH |= 0x02; // Register Read - High
}
if (rxBufferCount >= rxBufferSize) {
rxBufferCount = ERROR_RX_OVERFLOW;
goto EXIT;
}
rxBuffer[rxBufferCount++] = data;
while ((PINJ & mask) == mask) {
NOP;
}
} while (/* RECEIVER ACTIVE */ (PIND & 0x8) == 0x8);
EXIT:
rxState = RX_STATE_RECEIVED;
}

View File

@@ -0,0 +1,226 @@
// Copyright (c) 2019, Andrew Kay
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#include <Arduino.h>
#include "CoaxTransceiver.h"
#define FRAME_END 0xc0
#define FRAME_ESCAPE 0xdb
#define FRAME_ESCAPE_END 0xdc
#define FRAME_ESCAPE_ESCAPE 0xdd
enum {
WAIT_START,
DATA,
ESCAPE
} frameState;
#define FRAME_BUFFER_SIZE ((25 * 80) * 2) + 32
uint8_t frameBuffer[FRAME_BUFFER_SIZE];
int frameBufferCount = 0;
#define ERROR_INVALID_MESSAGE 1
#define ERROR_UNKNOWN_COMMAND 2
void sendMessage(uint8_t *buffer, int bufferCount)
{
Serial.write((char) FRAME_END);
// Write the length.
Serial.write((char) bufferCount >> 8);
Serial.write((char) bufferCount);
for (int index = 0; index < bufferCount; index++) {
if (buffer[index] == FRAME_END) {
Serial.write((char) FRAME_ESCAPE);
Serial.write((char) FRAME_ESCAPE_END);
} else if (buffer[index] == FRAME_ESCAPE) {
Serial.write((char) FRAME_ESCAPE);
Serial.write((char) FRAME_ESCAPE_ESCAPE);
} else {
Serial.write((char) buffer[index]);
}
}
// Write the placeholder for checksum.
Serial.write((char) 0x00);
Serial.write((char) 0x00);
Serial.write((char) FRAME_END);
Serial.flush();
}
void sendErrorMessage(uint8_t code)
{
uint8_t message[] = { 0x02, code };
sendMessage(message, 2);
}
#define COMMAND_RESET 0x01
#define COMMAND_TRANSMIT 0x02
#define COMMAND_RECEIVE 0x04
#define COMMAND_TRANSMIT_RECEIVE 0x06
void handleResetCommand(uint8_t *buffer, int bufferCount)
{
uint8_t response[] = { 0x01, 0x00, 0x00, 0x01 };
sendMessage(response, 4);
}
void handleTransmitReceiveCommand(uint8_t *buffer, int bufferCount)
{
if (bufferCount < 6) {
sendErrorMessage(ERROR_INVALID_MESSAGE);
return;
}
uint16_t *transmitBuffer = (uint16_t *) (buffer + 2);
uint16_t transmitBufferCount = (bufferCount - 6) / 2;
uint16_t transmitRepeatCount = ((buffer[0] << 8) | buffer[1]) & 0x7fff;
uint16_t transmitRepeatOffset = buffer[0] >> 7;
uint16_t receiveBufferSize = (buffer[bufferCount - 4] << 8) | buffer[bufferCount - 3];
uint16_t receiveTimeout = (buffer[bufferCount - 2] << 8) | buffer[bufferCount - 1];
if (transmitBufferCount < 1) {
sendErrorMessage(ERROR_INVALID_MESSAGE);
return;
}
// Expand the provided data if applicable.
if (transmitRepeatCount > 1) {
uint8_t *source = ((uint8_t *) transmitBuffer) + (transmitRepeatOffset * 2);
uint8_t *destination = ((uint8_t *) transmitBuffer) + (transmitBufferCount * 2);
uint16_t sourceCount = transmitBufferCount - transmitRepeatOffset;
size_t length = sourceCount * 2;
for (int index = 1; index < transmitRepeatCount; index++) {
memcpy(destination, source, length);
transmitBufferCount += sourceCount;
destination += length;
}
}
uint16_t *receiveBuffer = (uint16_t *) (buffer + 2);
bufferCount = CoaxTransceiver::transmitReceive(transmitBuffer, transmitBufferCount, receiveBuffer, receiveBufferSize, receiveTimeout);
if (bufferCount < 0) {
sendErrorMessage(100 + ((-1) * bufferCount));
return;
}
// Send the response message.
buffer[1] = 0x01;
bufferCount = 1 + (bufferCount * 2);
sendMessage(buffer + 1, bufferCount);
}
void handleMessage(uint8_t *buffer, int bufferCount)
{
if (bufferCount < 1) {
sendErrorMessage(ERROR_INVALID_MESSAGE);
return;
}
uint8_t command = buffer[0];
if (command == COMMAND_RESET) {
handleResetCommand(buffer + 1, bufferCount - 1);
} else if (command == COMMAND_TRANSMIT_RECEIVE) {
handleTransmitReceiveCommand(buffer + 1, bufferCount - 1);
} else {
sendErrorMessage(ERROR_UNKNOWN_COMMAND);
}
}
void handleFrame(uint8_t *buffer, int bufferCount)
{
if (bufferCount < 4) {
sendErrorMessage(ERROR_INVALID_MESSAGE);
return;
}
int count = (buffer[0] << 8) | buffer[1];
if (bufferCount - 4 != count) {
sendErrorMessage(ERROR_INVALID_MESSAGE);
return;
}
handleMessage(buffer + 2, count);
}
void setup()
{
// Configure serial port and state machine.
Serial.begin(115200);
frameState = WAIT_START;
while (Serial.available() > 0) {
Serial.read();
}
// Configure the transceiver.
CoaxTransceiver::setup();
}
void loop()
{
if (Serial.available() > 0) {
uint8_t byte = Serial.read();
if (frameState == WAIT_START) {
if (byte == FRAME_END) {
frameState = DATA;
}
} else if (frameState == DATA) {
if (byte == FRAME_END) {
if (frameBufferCount > 0) {
handleFrame(frameBuffer, frameBufferCount);
}
frameBufferCount = 0;
} else if (byte == FRAME_ESCAPE) {
frameState = ESCAPE;
} else {
// TODO: overflow...
frameBuffer[frameBufferCount++] = byte;
}
} else if (frameState == ESCAPE) {
if (byte == FRAME_ESCAPE_END) {
// TODO: overflow...
frameBuffer[frameBufferCount++] = FRAME_END;
} else if (byte == FRAME_ESCAPE_ESCAPE) {
// TODO: overflow...
frameBuffer[frameBufferCount++] = FRAME_ESCAPE;
}
frameState = DATA;
}
}
}

View File

@@ -1 +1 @@
__version__ = '0.2.0'
__version__ = '0.3.1'

View File

@@ -1,6 +1,6 @@
from .__about__ import __version__
from .interface1 import Interface1
from .serial_interface import SerialInterface
from .serial_interface import SerialInterface

View File

@@ -1,218 +0,0 @@
"""
coax.interface1
~~~~~~~~~~~~~~~
"""
from enum import Flag
import itertools
import struct
from sliplib import SlipWrapper, ProtocolError
from .exceptions import InterfaceError, InterfaceTimeout, ReceiveError, ReceiveTimeout
class Interface1:
"""A serial attached Arduino interface using the National Semiconductor
DP8340 and DP8341.
"""
def __init__(self, serial):
if serial is None:
raise ValueError('Serial port is required')
self.serial = serial
self.slip_serial = SlipSerial(self.serial)
def reset(self):
"""Reset the interface."""
original_serial_timeout = self.serial.timeout
self.serial.reset_input_buffer()
self._write_message(b'\x01')
self.serial.timeout = 5
try:
message = self._read_message()
finally:
self.serial.timeout = original_serial_timeout
if message[0] != 0x01:
raise _convert_error(message)
if len(message) != 4:
raise InterfaceError('Invalid reset response')
(major, minor, patch) = struct.unpack('BBB', message[1:])
return '{}.{}.{}'.format(major, minor, patch)
def execute(self, command_word, data=None, response_length=1, timeout=None):
"""Executes a command.
:param command_word: the command to execute
:param data: optional bytearray containing command data
:param response_length: the expected response length
:param timeout: optional timeout in seconds
"""
timeout_milliseconds = 0
if timeout:
if self.serial.timeout and timeout > self.serial.timeout:
raise ValueError('Timeout cannot be greater than serial timeout')
timeout_milliseconds = int(timeout * 1000)
message = struct.pack(">BHHH", 0x02, command_word, response_length,
timeout_milliseconds)
if data is not None:
message += data
self._write_message(message)
message = self._read_message()
if message[0] != 0x01:
raise _convert_error(message)
response_bytes = message[1:]
response_words = [(hi << 8) | lo for (lo, hi) in zip(response_bytes[::2],
response_bytes[1::2])]
# Handle any receiver (DP8341) errors that are included in the response words.
error_words = [word for word in response_words if (word & 0x8000) == 0x8000]
if error_words:
raise _convert_receiver_errors(error_words)
return response_words
def offload_load_address_counter(self, address):
"""Executes a combined LO and HI address counter load.
:param address: the address
"""
parameters = struct.pack(">H", address)
self._execute_offload(0x01, parameters)
def offload_write(self, data, address=None, restore_original_address=False, repeat=0):
"""Executes a complex write operation.
:param data: the data
:param address: optional address to load before WRITE_DATA command
:param restore_original_address: restore the original data after write
:param repeat: repeat the data
"""
parameters = struct.pack(">HBH", 0xffff if address is None else address,
0x01 if restore_original_address else 0x00,
repeat) + data
self._execute_offload(0x02, parameters)
def _execute_offload(self, command, parameters=None):
"""Executes an offloaded command."""
message = struct.pack("BB", 0x03, command)
if parameters:
message += parameters
self._write_message(message)
message = self._read_message()
if message[0] != 0x01:
raise _convert_error(message)
def _read_message(self):
try:
message = self.slip_serial.recv_msg()
except ProtocolError:
raise InterfaceError('SLIP protocol error')
if len(message) < 4:
raise InterfaceError('Invalid response message')
(length,) = struct.unpack(">H", message[:2])
if length != len(message) - 4:
raise InterfaceError('Response message length mismatch')
if length < 1:
raise InterfaceError('Empty response message')
return message[2:-2]
def _write_message(self, message):
self.slip_serial.send_msg(struct.pack(">H", len(message)) + message + struct.pack(">H", 0))
ERROR_MAP = {
1: InterfaceError('Invalid request message'),
2: InterfaceError('Unknown command'),
3: InterfaceError('Unknown offload command'),
101: InterfaceError('Receiver active'),
102: ReceiveTimeout(),
103: ReceiveError('Receiver buffer overflow')
}
def _convert_error(message):
if message[0] != 0x02:
return InterfaceError('Invalid response')
if len(message) < 2:
return InterfaceError('Invalid error response')
if message[1] in ERROR_MAP:
return ERROR_MAP[message[1]]
return InterfaceError('Unknown error')
class ReceiverErrorCode(Flag):
"""Receiver (DP8341) error code."""
DATA_OVERFLOW = 0x01
PARITY = 0x02
TRANSMIT_CHECK_CONDITIONS = 0x04
INVALID_ENDING_SEQUENCE = 0x08
MID_BID_TRANSITION = 0x10
STARTING_SEQUENCE = 0x20
RECEIVER_DISABLED = 0x40
def _parse_receiver_error(word):
return [code for code in ReceiverErrorCode if code & ReceiverErrorCode(word & 0x7f)]
def _convert_receiver_errors(words):
codes = set(itertools.chain.from_iterable([_parse_receiver_error(word) for word
in words]))
message = 'Receiver ' + ', '.join([code.name for code in codes]) + ' error'
raise ReceiveError(message)
class SlipSerial(SlipWrapper):
"""sliplib wrapper for pySerial."""
def send_bytes(self, packet):
"""Sends a packet over the serial port."""
self.stream.write(packet)
self.stream.flush()
def recv_bytes(self):
"""Receive data from the serial port."""
if self.stream.closed:
return b''
count = self.stream.in_waiting
if count:
return self.stream.read(count)
byte = self.stream.read(1)
if byte == b'':
raise InterfaceTimeout()
return byte

View File

@@ -365,7 +365,8 @@ def _execute_read_command(interface, command_word, response_length=1,
validate_response_length=True, allow_trta_response=False,
trta_value=None, unpack=True, **kwargs):
"""Execute a standard read command."""
response = interface.execute(command_word, response_length=response_length, **kwargs)
response = interface.transmit_receive([command_word], receive_length=response_length,
**kwargs)
if allow_trta_response and len(response) == 1 and response[0] == 0:
return trta_value
@@ -379,7 +380,18 @@ def _execute_read_command(interface, command_word, response_length=1,
def _execute_write_command(interface, command_word, data=None, **kwargs):
"""Execute a standard write command."""
response = interface.execute(command_word, data, **kwargs)
data_words = []
transmit_repeat_count = None
if isinstance(data, tuple):
data_words = pack_data_words(data[0])
transmit_repeat_count = data[1]
elif data is not None:
data_words = pack_data_words(data)
response = interface.transmit_receive([command_word, *data_words],
transmit_repeat_count,
receive_length=1, **kwargs)
if len(response) != 1:
command = unpack_command_word(command_word)

View File

@@ -45,7 +45,7 @@ class SerialInterface(Interface):
def transmit(self, words, repeat_count=None, repeat_offset=1):
message = bytes([0x02])
message += _pack_transmit_header(len(words), repeat_count, repeat_offset)
message += _pack_transmit_header(repeat_count, repeat_offset)
message += _pack_transmit_data(words)
self._write_message(message)
@@ -69,7 +69,7 @@ class SerialInterface(Interface):
if message[0] != 0x01:
raise _convert_error(message)
return _unpack_receive_response(message[1:])
return _unpack_receive_data(message[1:])
def transmit_receive(self, transmit_words, transmit_repeat_count=None,
transmit_repeat_offset=1, receive_length=None,
@@ -78,8 +78,7 @@ class SerialInterface(Interface):
message = bytes([0x06])
message += _pack_transmit_header(len(transmit_words), transmit_repeat_count,
transmit_repeat_offset)
message += _pack_transmit_header(transmit_repeat_count, transmit_repeat_offset)
message += _pack_transmit_data(transmit_words)
message += _pack_receive_header(receive_length, timeout_milliseconds)
@@ -90,7 +89,7 @@ class SerialInterface(Interface):
if message[0] != 0x01:
raise _convert_error(message)
return _unpack_receive_response(message[1:])
return _unpack_receive_data(message[1:])
def _calculate_timeout_milliseconds(self, timeout):
milliseconds = 0
@@ -112,7 +111,7 @@ class SerialInterface(Interface):
if len(message) < 4:
raise InterfaceError('Invalid response message')
(length,) = struct.unpack(">H", message[:2])
(length,) = struct.unpack('>H', message[:2])
if length != len(message) - 4:
raise InterfaceError('Response message length mismatch')
@@ -123,30 +122,47 @@ class SerialInterface(Interface):
return message[2:-2]
def _write_message(self, message):
self.slip_serial.send_msg(struct.pack(">H", len(message)) + message +
struct.pack(">H", 0))
self.slip_serial.send_msg(struct.pack('>H', len(message)) + message +
struct.pack('>H', 0))
def _pack_transmit_header(length, repeat_count, repeat_offset):
def _pack_transmit_header(repeat_count, repeat_offset):
repeat = ((repeat_offset << 15) | repeat_count) if repeat_count else 0
return struct.pack(">HH", length, repeat)
return struct.pack('>H', repeat)
def _pack_transmit_data(words):
bytes_ = bytearray()
for word in words:
bytes_ += struct.pack(">H", word)
bytes_ += struct.pack('<H', word)
return bytes_
def _pack_receive_header(length, timeout_milliseconds):
return struct.pack(">HH", length or 0, timeout_milliseconds)
return struct.pack('>HH', length or 0, timeout_milliseconds)
def _unpack_receive_response(message):
pass
def _unpack_receive_data(bytes_):
return [(hi << 8) | lo for (lo, hi) in zip(bytes_[::2], bytes_[1::2])]
ERROR_MAP = {
1: InterfaceError('Invalid request message'),
2: InterfaceError('Unknown command'),
101: InterfaceError('Receiver active'),
102: ReceiveTimeout(),
103: ReceiveError('Receiver buffer overflow'),
104: ReceiveError('Receiver error')
}
def _convert_error(message):
# TODO
if message[0] != 0x02:
return InterfaceError('Invalid response')
if len(message) < 2:
return InterfaceError('Invalid error response')
if message[1] in ERROR_MAP:
return ERROR_MAP[message[1]]
return InterfaceError('Unknown error')

View File

@@ -1,21 +1,9 @@
#!/usr/bin/env python
import sys
import time
from serial import Serial
from common import create_serial, create_interface
sys.path.append('..')
from coax import Interface1, poll, poll_ack
print('Opening serial port...')
with Serial('/dev/ttyUSB0', 115200) as serial:
print('Sleeping to allow interface time to wake up...')
time.sleep(3)
interface = Interface1(serial)
with create_serial() as serial:
interface = create_interface(serial, reset=False, poll_flush=False)
print('Resetting interface...')

View File

@@ -1,31 +1,15 @@
#!/usr/bin/env python
import sys
import time
from serial import Serial
from common import create_serial, create_interface
sys.path.append('..')
from coax import poll, poll_ack
from coax import Interface1, poll, poll_ack
print('Opening serial port...')
with Serial('/dev/ttyUSB0', 115200) as serial:
print('Sleeping to allow interface time to wake up...')
time.sleep(3)
interface = Interface1(serial)
print('Resetting interface...')
version = interface.reset()
print(f'Firmware version is {version}')
with create_serial() as serial:
interface = create_interface(serial, poll_flush=False)
print('POLL...')
poll_response = poll(interface, timeout=5)
poll_response = poll(interface, receive_timeout=5)
print(poll_response)

View File

@@ -1,27 +1,11 @@
#!/usr/bin/env python
import sys
import time
from serial import Serial
from common import create_serial, create_interface
sys.path.append('..')
from coax import read_terminal_id
from coax import Interface1, read_terminal_id
print('Opening serial port...')
with Serial('/dev/ttyUSB0', 115200) as serial:
print('Sleeping to allow interface time to wake up...')
time.sleep(3)
interface = Interface1(serial)
print('Resetting interface...')
version = interface.reset()
print(f'Firmware version is {version}')
with create_serial() as serial:
interface = create_interface(serial)
print('READ_TERMINAL_ID...')

View File

@@ -1,27 +1,11 @@
#!/usr/bin/env python
import sys
import time
from serial import Serial
from common import create_serial, create_interface
sys.path.append('..')
from coax import read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data
from coax import Interface1, read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data
print('Opening serial port...')
with Serial('/dev/ttyUSB0', 115200) as serial:
print('Sleeping to allow interface time to wake up...')
time.sleep(3)
interface = Interface1(serial)
print('Resetting interface...')
version = interface.reset()
print(f'Firmware version is {version}')
with create_serial() as serial:
interface = create_interface(serial)
print('LOAD_ADDRESS_COUNTER_HI...')
@@ -44,3 +28,7 @@ with Serial('/dev/ttyUSB0', 115200) as serial:
lo = read_address_counter_lo(interface)
print(f'hi = {hi:02x}, lo = {lo:02x}')
print('WRITE_DATA (repeat twice)...')
write_data(interface, (bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'), 2))

View File

@@ -1,31 +0,0 @@
#!/usr/bin/env python
import sys
import time
from serial import Serial
sys.path.append('..')
from coax import Interface1
print('Opening serial port...')
with Serial('/dev/ttyUSB0', 115200) as serial:
print('Sleeping to allow interface time to wake up...')
time.sleep(3)
interface = Interface1(serial)
print('Resetting interface...')
version = interface.reset()
print(f'Firmware version is {version}')
for n in range(10):
print(f'Writing line {n + 1}...')
address = ((7 + n) * 80) + 80
interface.offload_write(b'\x80', address=address, repeat=n)

View File

@@ -1,27 +1,11 @@
#!/usr/bin/env python
import sys
import time
from serial import Serial
from common import create_serial, create_interface
sys.path.append('..')
from coax import read_status
from coax import Interface1, read_status
print('Opening serial port...')
with Serial('/dev/ttyUSB0', 115200) as serial:
print('Sleeping to allow interface time to wake up...')
time.sleep(3)
interface = Interface1(serial)
print('Resetting interface...')
version = interface.reset()
print(f'Firmware version is {version}')
with create_serial() as serial:
interface = create_interface(serial)
print('READ_STATUS...')

View File

@@ -1,30 +1,15 @@
#!/usr/bin/env python
import sys
import time
from serial import Serial
from common import create_serial, create_interface
sys.path.append('..')
from coax import Control, poll, load_address_counter_hi, load_address_counter_lo, write_data, load_control_register
from coax import Interface1, Control, poll, load_address_counter_hi, load_address_counter_lo, write_data, load_control_register
print('Opening serial port...')
with Serial('/dev/ttyUSB0', 115200) as serial:
print('Sleeping to allow interface time to wake up...')
time.sleep(3)
interface = Interface1(serial)
print('Resetting interface...')
version = interface.reset()
print(f'Firmware version is {version}')
with create_serial() as serial:
interface = create_interface(serial)
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)
write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'))
input('Press ENTER...')

View File

@@ -1,30 +1,15 @@
#!/usr/bin/env python
import sys
import time
from serial import Serial
from common import create_serial, create_interface
sys.path.append('..')
from coax import read_address_counter_hi, read_address_counter_lo, read_data, load_address_counter_hi, load_address_counter_lo, write_data
from coax import Interface1, read_address_counter_hi, read_address_counter_lo, read_data, load_address_counter_hi, load_address_counter_lo, write_data
print('Opening serial port...')
with Serial('/dev/ttyUSB0', 115200) as serial:
print('Sleeping to allow interface time to wake up...')
time.sleep(3)
interface = Interface1(serial)
print('Resetting interface...')
version = interface.reset()
print(f'Firmware version is {version}')
with create_serial() as serial:
interface = create_interface(serial)
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)
write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'))
load_address_counter_hi(interface, 0)

View File

@@ -1,30 +1,15 @@
#!/usr/bin/env python
import sys
import time
from serial import Serial
from common import create_serial, create_interface
sys.path.append('..')
from coax import SecondaryControl, read_address_counter_hi, read_address_counter_lo, read_multiple, load_address_counter_hi, load_address_counter_lo, load_secondary_control, write_data
from coax import Interface1, SecondaryControl, read_address_counter_hi, read_address_counter_lo, read_multiple, load_address_counter_hi, load_address_counter_lo, load_secondary_control, write_data
print('Opening serial port...')
with Serial('/dev/ttyUSB0', 115200) as serial:
print('Sleeping to allow interface time to wake up...')
time.sleep(3)
interface = Interface1(serial)
print('Resetting interface...')
version = interface.reset()
print(f'Firmware version is {version}')
with create_serial() as serial:
interface = create_interface(serial)
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)
write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'))
load_address_counter_hi(interface, 0)

View File

@@ -1,27 +1,11 @@
#!/usr/bin/env python
import sys
import time
from serial import Serial
from common import create_serial, create_interface
sys.path.append('..')
from coax import reset
from coax import Interface1, reset
print('Opening serial port...')
with Serial('/dev/ttyUSB0', 115200) as serial:
print('Sleeping to allow interface time to wake up...')
time.sleep(3)
interface = Interface1(serial)
print('Resetting interface...')
version = interface.reset()
print(f'Firmware version is {version}')
with create_serial() as serial:
interface = create_interface(serial)
print('RESET...')

View File

@@ -1,30 +1,15 @@
#!/usr/bin/env python
import sys
import time
from serial import Serial
from common import create_serial, create_interface
sys.path.append('..')
from coax import read_address_counter_hi, read_address_counter_lo, read_status, load_address_counter_hi, load_address_counter_lo, write_data, load_mask, search_forward, search_backward
from coax import Interface1, read_address_counter_hi, read_address_counter_lo, read_status, load_address_counter_hi, load_address_counter_lo, write_data, load_mask, search_forward, search_backward
print('Opening serial port...')
with Serial('/dev/ttyACM0', 115200) as serial:
print('Sleeping to allow interface time to wake up...')
time.sleep(3)
interface = Interface1(serial)
print('Resetting interface...')
version = interface.reset()
print(f'Firmware version is {version}')
with create_serial() as serial:
interface = create_interface(serial)
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)
write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'))
load_address_counter_hi(interface, 0)

View File

@@ -1,30 +1,15 @@
#!/usr/bin/env python
import sys
import time
from serial import Serial
from common import create_serial, create_interface
sys.path.append('..')
from coax import read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data, insert_byte
from coax import Interface1, read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data, insert_byte
print('Opening serial port...')
with Serial('/dev/ttyUSB0', 115200) as serial:
print('Sleeping to allow interface time to wake up...')
time.sleep(3)
interface = Interface1(serial)
print('Resetting interface...')
version = interface.reset()
print(f'Firmware version is {version}')
with create_serial() as serial:
interface = create_interface(serial)
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)
write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'))
input('Press ENTER...')

View File

@@ -1,30 +1,15 @@
#!/usr/bin/env python
import sys
import time
from serial import Serial
from common import create_serial, create_interface
sys.path.append('..')
from coax import read_address_counter_hi, read_address_counter_lo, read_status, load_address_counter_hi, load_address_counter_lo, write_data, load_mask, clear
from coax import Interface1, read_address_counter_hi, read_address_counter_lo, read_status, load_address_counter_hi, load_address_counter_lo, write_data, load_mask, clear
print('Opening serial port...')
with Serial('/dev/ttyUSB0', 115200) as serial:
print('Sleeping to allow interface time to wake up...')
time.sleep(3)
interface = Interface1(serial)
print('Resetting interface...')
version = interface.reset()
print(f'Firmware version is {version}')
with create_serial() as serial:
interface = create_interface(serial)
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)
write_data(interface, bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'))
input('Press ENTER...')

View File

@@ -1,29 +1,13 @@
#!/usr/bin/env python
import sys
import time
from serial import Serial
from common import create_serial, create_interface
sys.path.append('..')
from coax import Interface1, read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data
from coax import read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data
DIGIT_MAP = [0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x80, 0x81, 0x82, 0x83, 0x84, 0x85]
print('Opening serial port...')
with Serial('/dev/ttyUSB0', 115200) as serial:
print('Sleeping to allow interface time to wake up...')
time.sleep(3)
interface = Interface1(serial)
print('Resetting interface...')
version = interface.reset()
print(f'Firmware version is {version}')
with create_serial() as serial:
interface = create_interface(serial)
print('LOAD_ADDRESS_COUNTER_HI...')

50
pycoax/examples/common.py Normal file
View File

@@ -0,0 +1,50 @@
import sys
import time
from serial import Serial
sys.path.append('..')
from coax import SerialInterface, poll, poll_ack
SERIAL_PORT = '/dev/ttyACM0'
def create_serial():
port = SERIAL_PORT
print(f'Opening {port}...')
serial = Serial(port, 115200)
return serial
def create_interface(serial, reset=True, poll_flush=True):
print('Sleeping to allow interface time to wake up...')
time.sleep(3)
interface = SerialInterface(serial)
if reset:
print('Resetting interface...')
version = interface.reset()
print(f'Firmware version is {version}')
if poll_flush:
print('POLLing...')
count = 0
poll_response = poll(interface, receive_timeout=1)
while poll_response:
poll_ack(interface)
count += 1
poll_response = poll(interface, receive_timeout=1)
print(f'ACK\'d {count} POLL responses')
return interface

View File

@@ -10,14 +10,14 @@ LONG_DESCRIPTION = """# pycoax
Python IBM 3270 coaxial interface library.
See [GitHub](https://github.com/lowobservable/coax-interface#readme) for more information.
See [GitHub](https://github.com/lowobservable/coax#readme) for more information.
"""
setup(
name='pycoax',
version=ABOUT['__version__'],
description='IBM 3270 coaxial interface',
url='https://github.com/lowobservable/coax-interface',
url='https://github.com/lowobservable/coax',
author='Andrew Kay',
author_email='projects@ajk.me',
packages=['coax'],

View File

@@ -1,264 +0,0 @@
import unittest
from unittest.mock import Mock
import sliplib
import context
from coax import Interface1, InterfaceError, ReceiveError, ReceiveTimeout
class Interface1ResetTestCase(unittest.TestCase):
def setUp(self):
self.serial = Mock()
self.serial.timeout = None
self.interface = Interface1(self.serial)
self.interface._write_message = Mock()
self.interface._read_message = Mock(return_value=bytes.fromhex('01 01 02 03'))
def test_message_is_sent(self):
# Act
self.interface.reset()
# Assert
self.interface._write_message.assert_called_with(bytes.fromhex('01'))
def test_version_is_formatted_correctly(self):
self.assertEqual(self.interface.reset(), '1.2.3')
def test_timeout_is_restored_after_reset(self):
# Arrange
self.serial.timeout = 123
# Act
self.interface.reset()
# Assert
self.assertEqual(self.serial.timeout, 123)
def test_invalid_message_length_is_handled_correctly(self):
# Arrange
self.interface._read_message = Mock(return_value=bytes.fromhex('01 01'))
# Act and assert
with self.assertRaisesRegex(InterfaceError, 'Invalid reset response'):
self.interface.reset()
def test_error_is_handled_correctly(self):
# Arrange
self.interface._read_message = Mock(return_value=bytes.fromhex('02 01'))
# Act and assert
with self.assertRaisesRegex(InterfaceError, 'Invalid request message'):
self.interface.reset()
class Interface1ExecuteTestCase(unittest.TestCase):
def setUp(self):
self.serial = Mock()
self.serial.timeout = None
self.interface = Interface1(self.serial)
self.interface._write_message = Mock()
self.interface._read_message = Mock(return_value=bytes.fromhex('01 00'))
def test_message_is_sent_without_data(self):
# Act
self.interface.execute(0b0000001001)
# Assert
self.interface._write_message.assert_called_with(bytes.fromhex('02 00 09 00 01 00 00'))
def test_message_is_sent_with_data(self):
# Act
self.interface.execute(0b0000110001, data=bytes.fromhex('de ad be ef'))
# Assert
self.interface._write_message.assert_called_with(bytes.fromhex('02 00 31 00 01 00 00 de ad be ef'))
def test_message_is_sent_with_response_length(self):
# Act
self.interface.execute(0b0000011101, response_length=4)
# Assert
self.interface._write_message.assert_called_with(bytes.fromhex('02 00 1d 00 04 00 00'))
def test_timeout_cannot_exceed_serial_timeout(self):
# Arrange
self.serial.timeout = 2.0
# Act and assert
with self.assertRaisesRegex(ValueError, 'Timeout cannot be greater than serial timeout'):
self.interface.execute(0b0000000101, timeout=3)
def test_message_is_sent_with_timeout(self):
# Act
self.interface.execute(0b0000000101, timeout=3)
# Assert
self.interface._write_message.assert_called_with(bytes.fromhex('02 00 05 00 01 0b b8'))
# TODO... interface timeout...
def test_receive_timeout_is_handled_correctly(self):
# Arrange
self.interface._read_message = Mock(return_value=bytes.fromhex('02 66'))
# Act and assert
with self.assertRaises(ReceiveTimeout):
self.interface.execute(0b0000000101)
def test_receive_error_is_handled_correctly(self):
# Arrange
self.interface._read_message = Mock(return_value=bytes.fromhex('02 67'))
# Act and assert
with self.assertRaisesRegex(ReceiveError, 'Receiver buffer overflow'):
self.interface.execute(0b0000000101)
def test_interface_error_is_handled_correctly(self):
# Arrange
self.interface._read_message = Mock(return_value=bytes.fromhex('03'))
# Act and assert
with self.assertRaisesRegex(InterfaceError, 'Invalid response'):
self.interface.execute(0b0000000101)
def test_response_words_are_unpacked(self):
# Arrange
self.interface._read_message = Mock(return_value=bytes.fromhex('01 01 02 03 04'))
# Act
response_words = self.interface.execute(0b0000011101, response_length=4)
# Assert
self.assertEqual(response_words, [0x0201, 0x0403])
def test_receiver_error_is_handled_correctly(self):
# Arrange
self.interface._read_message = Mock(return_value=bytes.fromhex('01 a0 80'))
# Act and assert
with self.assertRaisesRegex(ReceiveError, 'Receiver STARTING_SEQUENCE error'):
self.interface.execute(0b0000011101, response_length=4)
class Interface1OffloadLoadAddressCounterTestCase(unittest.TestCase):
def setUp(self):
self.serial = Mock()
self.interface = Interface1(self.serial)
self.interface._execute_offload = Mock()
def test(self):
# Act
self.interface.offload_load_address_counter(960)
# Assert
self.interface._execute_offload.assert_called_with(0x01, bytes.fromhex('03 c0'))
class Interface1OffloadWriteTestCase(unittest.TestCase):
def setUp(self):
self.serial = Mock()
self.interface = Interface1(self.serial)
self.interface._execute_offload = Mock()
def test_message_is_sent_with_data(self):
# Act
self.interface.offload_write(bytes.fromhex('de ad be ef'))
# Assert
self.interface._execute_offload.assert_called_with(0x02, bytes.fromhex('ff ff 00 00 00 de ad be ef'))
def test_message_is_sent_with_address(self):
# Act
self.interface.offload_write(bytes.fromhex('de ad be ef'), address=960)
# Assert
self.interface._execute_offload.assert_called_with(0x02, bytes.fromhex('03 c0 00 00 00 de ad be ef'))
def test_message_is_sent_with_restore_original_address(self):
# Act
self.interface.offload_write(bytes.fromhex('de ad be ef'), restore_original_address=True)
# Assert
self.interface._execute_offload.assert_called_with(0x02, bytes.fromhex('ff ff 01 00 00 de ad be ef'))
def test_message_is_sent_with_repeat(self):
# Act
self.interface.offload_write(bytes.fromhex('de ad be ef'), repeat=1)
# Assert
self.interface._execute_offload.assert_called_with(0x02, bytes.fromhex('ff ff 00 00 01 de ad be ef'))
class Interface1ReadMessageTestCase(unittest.TestCase):
def setUp(self):
self.serial = Mock()
self.interface = Interface1(self.serial)
self.interface.slip_serial = Mock()
def test(self):
# Arrange
self.interface.slip_serial.recv_msg = Mock(return_value=bytes.fromhex('00 04 01 02 03 04 00 00'))
# Act
message = self.interface._read_message()
# Assert
self.assertEqual(message, bytes.fromhex('01 02 03 04'))
def test_protocol_error_is_handled_correctly(self):
# Arrange
self.interface.slip_serial.recv_msg = Mock(side_effect=sliplib.ProtocolError)
# Act and assert
with self.assertRaisesRegex(InterfaceError, 'SLIP protocol error'):
self.interface._read_message()
def test_invalid_message_length_is_handled_correctly(self):
# Arrange
self.interface.slip_serial.recv_msg = Mock(return_value=bytes.fromhex('00'))
# Act and assert
with self.assertRaisesRegex(InterfaceError, 'Invalid response message'):
self.interface._read_message()
def test_message_length_mismatch_is_handled_correctly(self):
# Arrange
self.interface.slip_serial.recv_msg = Mock(return_value=bytes.fromhex('00 05 01 02 03 04 00 00'))
# Act and assert
with self.assertRaisesRegex(InterfaceError, 'Response message length mismatch'):
self.interface._read_message()
def test_empty_message_is_handled_correctly(self):
# Arrange
self.interface.slip_serial.recv_msg = Mock(return_value=bytes.fromhex('00 00 00 00'))
# Act and assert
with self.assertRaisesRegex(InterfaceError, 'Empty response message'):
self.interface._read_message()
class Interface1WriteMessageTestCase(unittest.TestCase):
def setUp(self):
self.serial = Mock()
self.interface = Interface1(self.serial)
self.interface.slip_serial = Mock()
def test(self):
# Act
self.interface._write_message(bytes.fromhex('01 02 03 04'))
# Assert
self.interface.slip_serial.send_msg.assert_called_with(bytes.fromhex('00 04 01 02 03 04 00 00'))
if __name__ == '__main__':
unittest.main()

View File

@@ -151,7 +151,7 @@ class ExecuteReadCommandTestCase(unittest.TestCase):
# Arrange
command_word = pack_command_word(Command.READ_TERMINAL_ID)
self.interface.execute = Mock(return_value=[0b0000000010])
self.interface.transmit_receive = Mock(return_value=[0b0000000010])
# Act and assert
self.assertEqual(_execute_read_command(self.interface, command_word), bytes.fromhex('00'))
@@ -160,7 +160,7 @@ class ExecuteReadCommandTestCase(unittest.TestCase):
# Arrange
command_word = pack_command_word(Command.POLL)
self.interface.execute = Mock(return_value=[0b0000000000])
self.interface.transmit_receive = Mock(return_value=[0b0000000000])
# Act and assert
self.assertEqual(_execute_read_command(self.interface, command_word, allow_trta_response=True, trta_value='TRTA'), 'TRTA')
@@ -169,7 +169,7 @@ class ExecuteReadCommandTestCase(unittest.TestCase):
# Arrange
command_word = pack_command_word(Command.POLL)
self.interface.execute = Mock(return_value=[0b1111111110])
self.interface.transmit_receive = Mock(return_value=[0b1111111110])
# Act and assert
self.assertEqual(_execute_read_command(self.interface, command_word, unpack=False), [0b1111111110])
@@ -178,23 +178,23 @@ class ExecuteReadCommandTestCase(unittest.TestCase):
# Arrange
command_word = pack_command_word(Command.READ_TERMINAL_ID)
self.interface.execute = Mock(return_value=[])
self.interface.transmit_receive = Mock(return_value=[])
# Act and assert
with self.assertRaisesRegex(ProtocolError, 'Expected 1 word READ_TERMINAL_ID response'):
_execute_read_command(self.interface, command_word)
def test_timeout_is_passed_to_interface(self):
def test_receive_timeout_is_passed_to_interface(self):
# Arrange
command_word = pack_command_word(Command.READ_TERMINAL_ID)
self.interface.execute = Mock(return_value=[0b0000000010])
self.interface.transmit_receive = Mock(return_value=[0b0000000010])
# Act
_execute_read_command(self.interface, command_word, timeout=10)
_execute_read_command(self.interface, command_word, receive_timeout=10)
# Assert
self.assertEqual(self.interface.execute.call_args[1].get('timeout'), 10)
self.assertEqual(self.interface.transmit_receive.call_args[1].get('receive_timeout'), 10)
class ExecuteWriteCommandTestCase(unittest.TestCase):
def setUp(self):
@@ -204,7 +204,7 @@ class ExecuteWriteCommandTestCase(unittest.TestCase):
# Arrange
command_word = pack_command_word(Command.WRITE_DATA)
self.interface.execute = Mock(return_value=[0b0000000000])
self.interface.transmit_receive = Mock(return_value=[0b0000000000])
# Act and assert
_execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef'))
@@ -213,7 +213,7 @@ class ExecuteWriteCommandTestCase(unittest.TestCase):
# Arrange
command_word = pack_command_word(Command.WRITE_DATA)
self.interface.execute = Mock(return_value=[])
self.interface.transmit_receive = Mock(return_value=[])
# Act and assert
with self.assertRaisesRegex(ProtocolError, 'Expected 1 word WRITE_DATA response'):
@@ -223,23 +223,23 @@ class ExecuteWriteCommandTestCase(unittest.TestCase):
# Arrange
command_word = pack_command_word(Command.WRITE_DATA)
self.interface.execute = Mock(return_value=[0b0000000010])
self.interface.transmit_receive = Mock(return_value=[0b0000000010])
# Act and assert
with self.assertRaisesRegex(ProtocolError, 'Expected TR/TA response'):
_execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef'))
def test_timeout_is_passed_to_interface(self):
def test_receive_timeout_is_passed_to_interface(self):
# Arrange
command_word = pack_command_word(Command.WRITE_DATA)
self.interface.execute = Mock(return_value=[0b0000000000])
self.interface.transmit_receive = Mock(return_value=[0b0000000000])
# Assert
_execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef'), timeout=10)
_execute_write_command(self.interface, command_word, bytes.fromhex('de ad be ef'), receive_timeout=10)
# Assert
self.assertEqual(self.interface.execute.call_args[1].get('timeout'), 10)
self.assertEqual(self.interface.transmit_receive.call_args[1].get('receive_timeout'), 10)
if __name__ == '__main__':
unittest.main()