From a855732f0cff5ab350c0b98b8dde82c60ab24ffa Mon Sep 17 00:00:00 2001 From: Andrew Kay Date: Thu, 24 Jun 2021 21:41:11 -0500 Subject: [PATCH] Add open_serial_interface function --- pycoax/README.md | 13 +---- pycoax/coax/__init__.py | 2 +- pycoax/coax/serial_interface.py | 22 ++++++++ pycoax/examples/01_reset.py | 6 +-- pycoax/examples/02_poll.py | 6 +-- pycoax/examples/03_read_terminal_id.py | 6 +-- pycoax/examples/04_write_data.py | 6 +-- pycoax/examples/07_read_status.py | 6 +-- pycoax/examples/08_load_control_register.py | 6 +-- pycoax/examples/09_read_data.py | 6 +-- pycoax/examples/10_read_multiple.py | 6 +-- pycoax/examples/11_reset.py | 6 +-- pycoax/examples/12_search.py | 6 +-- pycoax/examples/13_insert_byte.py | 6 +-- pycoax/examples/14_clear.py | 6 +-- pycoax/examples/20_char_map.py | 6 +-- pycoax/examples/30_get_features.py | 8 +-- pycoax/examples/40_eab.py | 6 +-- pycoax/examples/common.py | 58 +++++++++------------ 19 files changed, 79 insertions(+), 108 deletions(-) diff --git a/pycoax/README.md b/pycoax/README.md index 3d177a2..0981c47 100644 --- a/pycoax/README.md +++ b/pycoax/README.md @@ -16,19 +16,10 @@ Assuming your interface is connected to `/dev/ttyACM0` and you have a CUT type t ``` import time -from serial import Serial -from coax import SerialInterface, poll, poll_ack, load_address_counter_hi, \ +from coax import open_serial_interface, poll, poll_ack, load_address_counter_hi, \ load_address_counter_lo, write_data, ReceiveTimeout -with Serial('/dev/ttyACM0', 115200) as serial: - # Give the interface time to wake up... - time.sleep(3) - - # Initialize and reset the interface. - interface = SerialInterface(serial) - - interface.reset() - +with open_serial_interface('/dev/ttyACM0') as interface: # Wait for a terminal to attach... poll_response = None attached = False diff --git a/pycoax/coax/__init__.py b/pycoax/coax/__init__.py index 9012455..ebe18ae 100644 --- a/pycoax/coax/__init__.py +++ b/pycoax/coax/__init__.py @@ -1,6 +1,6 @@ from .__about__ import __version__ -from .serial_interface import SerialInterface +from .serial_interface import SerialInterface, open_serial_interface from .protocol import ( PollAction, diff --git a/pycoax/coax/serial_interface.py b/pycoax/coax/serial_interface.py index f828ae8..494dd54 100644 --- a/pycoax/coax/serial_interface.py +++ b/pycoax/coax/serial_interface.py @@ -3,8 +3,12 @@ coax.serial_interface ~~~~~~~~~~~~~~~~~~~~~ """ +import time +import os import struct from copy import copy +from contextlib import contextmanager +from serial import Serial from sliplib import SlipWrapper, ProtocolError from .interface import Interface @@ -106,6 +110,24 @@ class SerialInterface(Interface): self.slip_serial.send_msg(struct.pack('>H', len(message)) + message + struct.pack('>H', 0)) +@contextmanager +def open_serial_interface(serial_port, reset=True): + with Serial(serial_port, 115200) as serial: + serial.reset_input_buffer() + serial.reset_output_buffer() + + # Allow the interface firmware time to start, this is only required for the + # original Arduino Mega based interface. + if 'COAX_FAST_START' not in os.environ: + time.sleep(3) + + interface = SerialInterface(serial) + + if reset: + interface.reset() + + yield interface + def _pack_transmit_header(repeat_count, repeat_offset): repeat = ((repeat_offset << 15) | repeat_count) if repeat_count else 0 diff --git a/pycoax/examples/01_reset.py b/pycoax/examples/01_reset.py index 0d023b1..249fbdb 100755 --- a/pycoax/examples/01_reset.py +++ b/pycoax/examples/01_reset.py @@ -1,10 +1,8 @@ #!/usr/bin/env python -from common import create_serial, create_interface - -with create_serial() as serial: - interface = create_interface(serial, reset=False, poll_flush=False) +from common import open_example_serial_interface +with open_example_serial_interface(reset=False, poll_flush=False) as interface: print('Resetting interface...') interface.reset() diff --git a/pycoax/examples/02_poll.py b/pycoax/examples/02_poll.py index c840bf4..d67406d 100755 --- a/pycoax/examples/02_poll.py +++ b/pycoax/examples/02_poll.py @@ -1,12 +1,10 @@ #!/usr/bin/env python -from common import create_serial, create_interface +from common import open_example_serial_interface from coax import poll, poll_ack -with create_serial() as serial: - interface = create_interface(serial, poll_flush=False) - +with open_example_serial_interface(poll_flush=False) as interface: print('POLL...') poll_response = poll(interface, receive_timeout=5) diff --git a/pycoax/examples/03_read_terminal_id.py b/pycoax/examples/03_read_terminal_id.py index 53ed4b8..51fd640 100755 --- a/pycoax/examples/03_read_terminal_id.py +++ b/pycoax/examples/03_read_terminal_id.py @@ -1,12 +1,10 @@ #!/usr/bin/env python -from common import create_serial, create_interface +from common import open_example_serial_interface from coax import read_terminal_id -with create_serial() as serial: - interface = create_interface(serial) - +with open_example_serial_interface() as interface: print('READ_TERMINAL_ID...') terminal_id = read_terminal_id(interface) diff --git a/pycoax/examples/04_write_data.py b/pycoax/examples/04_write_data.py index 28ce94f..39eb318 100755 --- a/pycoax/examples/04_write_data.py +++ b/pycoax/examples/04_write_data.py @@ -1,12 +1,10 @@ #!/usr/bin/env python -from common import create_serial, create_interface +from common import open_example_serial_interface from coax import read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data -with create_serial() as serial: - interface = create_interface(serial) - +with open_example_serial_interface() as interface: print('LOAD_ADDRESS_COUNTER_HI...') load_address_counter_hi(interface, 0) diff --git a/pycoax/examples/07_read_status.py b/pycoax/examples/07_read_status.py index 0132482..4ef1454 100755 --- a/pycoax/examples/07_read_status.py +++ b/pycoax/examples/07_read_status.py @@ -1,12 +1,10 @@ #!/usr/bin/env python -from common import create_serial, create_interface +from common import open_example_serial_interface from coax import read_status -with create_serial() as serial: - interface = create_interface(serial) - +with open_example_serial_interface() as interface: print('READ_STATUS...') status = read_status(interface) diff --git a/pycoax/examples/08_load_control_register.py b/pycoax/examples/08_load_control_register.py index dd5deff..3c0bd4d 100755 --- a/pycoax/examples/08_load_control_register.py +++ b/pycoax/examples/08_load_control_register.py @@ -1,12 +1,10 @@ #!/usr/bin/env python -from common import create_serial, create_interface +from common import open_example_serial_interface from coax import Control, poll, load_address_counter_hi, load_address_counter_lo, write_data, load_control_register -with create_serial() as serial: - interface = create_interface(serial) - +with open_example_serial_interface() as interface: load_address_counter_hi(interface, 0) load_address_counter_lo(interface, 80) diff --git a/pycoax/examples/09_read_data.py b/pycoax/examples/09_read_data.py index 954fa9a..9f14aa5 100755 --- a/pycoax/examples/09_read_data.py +++ b/pycoax/examples/09_read_data.py @@ -1,12 +1,10 @@ #!/usr/bin/env python -from common import create_serial, create_interface +from common import open_example_serial_interface from coax import read_address_counter_hi, read_address_counter_lo, read_data, load_address_counter_hi, load_address_counter_lo, write_data -with create_serial() as serial: - interface = create_interface(serial) - +with open_example_serial_interface() as interface: load_address_counter_hi(interface, 0) load_address_counter_lo(interface, 80) diff --git a/pycoax/examples/10_read_multiple.py b/pycoax/examples/10_read_multiple.py index 230661a..9508ce8 100755 --- a/pycoax/examples/10_read_multiple.py +++ b/pycoax/examples/10_read_multiple.py @@ -1,12 +1,10 @@ #!/usr/bin/env python -from common import create_serial, create_interface +from common import open_example_serial_interface from coax import SecondaryControl, read_address_counter_hi, read_address_counter_lo, read_multiple, load_address_counter_hi, load_address_counter_lo, load_secondary_control, write_data -with create_serial() as serial: - interface = create_interface(serial) - +with open_example_serial_interface() as interface: load_address_counter_hi(interface, 0) load_address_counter_lo(interface, 80) diff --git a/pycoax/examples/11_reset.py b/pycoax/examples/11_reset.py index 13536f7..64120c2 100755 --- a/pycoax/examples/11_reset.py +++ b/pycoax/examples/11_reset.py @@ -1,12 +1,10 @@ #!/usr/bin/env python -from common import create_serial, create_interface +from common import open_example_serial_interface from coax import reset -with create_serial() as serial: - interface = create_interface(serial) - +with open_example_serial_interface() as interface: print('RESET...') reset(interface) diff --git a/pycoax/examples/12_search.py b/pycoax/examples/12_search.py index d2828e8..c11c0a5 100755 --- a/pycoax/examples/12_search.py +++ b/pycoax/examples/12_search.py @@ -1,12 +1,10 @@ #!/usr/bin/env python -from common import create_serial, create_interface +from common import open_example_serial_interface from coax import read_address_counter_hi, read_address_counter_lo, read_status, load_address_counter_hi, load_address_counter_lo, write_data, load_mask, search_forward, search_backward -with create_serial() as serial: - interface = create_interface(serial) - +with open_example_serial_interface() as interface: load_address_counter_hi(interface, 0) load_address_counter_lo(interface, 80) diff --git a/pycoax/examples/13_insert_byte.py b/pycoax/examples/13_insert_byte.py index ad0ec08..1d3441a 100755 --- a/pycoax/examples/13_insert_byte.py +++ b/pycoax/examples/13_insert_byte.py @@ -1,12 +1,10 @@ #!/usr/bin/env python -from common import create_serial, create_interface +from common import open_example_serial_interface from coax import read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data, insert_byte -with create_serial() as serial: - interface = create_interface(serial) - +with open_example_serial_interface() as interface: load_address_counter_hi(interface, 0) load_address_counter_lo(interface, 80) diff --git a/pycoax/examples/14_clear.py b/pycoax/examples/14_clear.py index 74e66d4..ed32fc4 100755 --- a/pycoax/examples/14_clear.py +++ b/pycoax/examples/14_clear.py @@ -1,12 +1,10 @@ #!/usr/bin/env python -from common import create_serial, create_interface +from common import open_example_serial_interface from coax import read_address_counter_hi, read_address_counter_lo, read_status, load_address_counter_hi, load_address_counter_lo, write_data, load_mask, clear -with create_serial() as serial: - interface = create_interface(serial) - +with open_example_serial_interface() as interface: load_address_counter_hi(interface, 0) load_address_counter_lo(interface, 80) diff --git a/pycoax/examples/20_char_map.py b/pycoax/examples/20_char_map.py index cf435ca..691ad8c 100755 --- a/pycoax/examples/20_char_map.py +++ b/pycoax/examples/20_char_map.py @@ -1,14 +1,12 @@ #!/usr/bin/env python -from common import create_serial, create_interface +from common import open_example_serial_interface from coax import Control, read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data, load_control_register DIGIT_MAP = [0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x80, 0x81, 0x82, 0x83, 0x84, 0x85] -with create_serial() as serial: - interface = create_interface(serial) - +with open_example_serial_interface() as interface: load_control_register(interface, Control(cursor_inhibit=True)) load_address_counter_hi(interface, 0) diff --git a/pycoax/examples/30_get_features.py b/pycoax/examples/30_get_features.py index fac1555..0f0ddc4 100755 --- a/pycoax/examples/30_get_features.py +++ b/pycoax/examples/30_get_features.py @@ -1,14 +1,10 @@ #!/usr/bin/env python -import sys - -from common import create_serial, create_interface +from common import open_example_serial_interface from coax import get_features -with create_serial() as serial: - interface = create_interface(serial) - +with open_example_serial_interface() as interface: features = get_features(interface) print(features) diff --git a/pycoax/examples/40_eab.py b/pycoax/examples/40_eab.py index d0ee785..ea40f9d 100755 --- a/pycoax/examples/40_eab.py +++ b/pycoax/examples/40_eab.py @@ -3,16 +3,14 @@ import sys from itertools import chain -from common import create_serial, create_interface +from common import open_example_serial_interface from coax import Feature, get_features, load_address_counter_hi, load_address_counter_lo, write_data, eab_write_alternate, eab_load_mask def eab_alternate_zip(regen_buffer, eab_buffer): return bytes(chain(*zip(regen_buffer, eab_buffer))) -with create_serial() as serial: - interface = create_interface(serial) - +with open_example_serial_interface() as interface: features = get_features(interface) if Feature.EAB not in features: diff --git a/pycoax/examples/common.py b/pycoax/examples/common.py index b516272..e6332d3 100644 --- a/pycoax/examples/common.py +++ b/pycoax/examples/common.py @@ -1,53 +1,43 @@ import sys import time import os -from serial import Serial +from contextlib import contextmanager sys.path.append('..') -from coax import SerialInterface, poll, poll_ack +from coax import open_serial_interface, poll, poll_ack DEFAULT_SERIAL_PORT = '/dev/ttyACM0' -def create_serial(): - port = os.environ.get('COAX_PORT', DEFAULT_SERIAL_PORT) +@contextmanager +def open_example_serial_interface(reset=True, poll_flush=True): + serial_port = os.environ.get('COAX_PORT', DEFAULT_SERIAL_PORT) - print(f'Opening {port}...') + print(f'Opening {serial_port}...') - serial = Serial(port, 115200) + with open_serial_interface(serial_port, reset=False) as interface: + if reset: + print('Resetting interface...') - return serial + interface.reset() -def create_interface(serial, reset=True, poll_flush=True): - if 'COAX_FAST_START' not in os.environ: - print('Sleeping to allow interface time to wake up...') + if interface.legacy_firmware_detected: + print(f'Firmware version is {interface.legacy_firmware_version}') - time.sleep(3) + if poll_flush: + print('POLLing...') - interface = SerialInterface(serial) - - if reset: - print('Resetting interface...') - - interface.reset() - - if interface.legacy_firmware_detected: - print(f'Firmware version is {interface.legacy_firmware_version}') - - if poll_flush: - print('POLLing...') - - count = 0 - - poll_response = poll(interface, receive_timeout=1) - - while poll_response: - poll_ack(interface) - - count += 1 + count = 0 poll_response = poll(interface, receive_timeout=1) - print(f'ACK\'d {count} POLL responses') + while poll_response: + poll_ack(interface) - return interface + count += 1 + + poll_response = poll(interface, receive_timeout=1) + + print(f'ACK\'d {count} POLL responses') + + yield interface