Add open_serial_interface function

This commit is contained in:
Andrew Kay 2021-06-24 21:41:11 -05:00
parent 86c6b1f8e9
commit a855732f0c
19 changed files with 79 additions and 108 deletions

View File

@ -16,19 +16,10 @@ Assuming your interface is connected to `/dev/ttyACM0` and you have a CUT type t
```
import time
from serial import Serial
from coax import SerialInterface, poll, poll_ack, load_address_counter_hi, \
from coax import open_serial_interface, poll, poll_ack, load_address_counter_hi, \
load_address_counter_lo, write_data, ReceiveTimeout
with Serial('/dev/ttyACM0', 115200) as serial:
# Give the interface time to wake up...
time.sleep(3)
# Initialize and reset the interface.
interface = SerialInterface(serial)
interface.reset()
with open_serial_interface('/dev/ttyACM0') as interface:
# Wait for a terminal to attach...
poll_response = None
attached = False

View File

@ -1,6 +1,6 @@
from .__about__ import __version__
from .serial_interface import SerialInterface
from .serial_interface import SerialInterface, open_serial_interface
from .protocol import (
PollAction,

View File

@ -3,8 +3,12 @@ coax.serial_interface
~~~~~~~~~~~~~~~~~~~~~
"""
import time
import os
import struct
from copy import copy
from contextlib import contextmanager
from serial import Serial
from sliplib import SlipWrapper, ProtocolError
from .interface import Interface
@ -106,6 +110,24 @@ class SerialInterface(Interface):
self.slip_serial.send_msg(struct.pack('>H', len(message)) + message +
struct.pack('>H', 0))
@contextmanager
def open_serial_interface(serial_port, reset=True):
with Serial(serial_port, 115200) as serial:
serial.reset_input_buffer()
serial.reset_output_buffer()
# Allow the interface firmware time to start, this is only required for the
# original Arduino Mega based interface.
if 'COAX_FAST_START' not in os.environ:
time.sleep(3)
interface = SerialInterface(serial)
if reset:
interface.reset()
yield interface
def _pack_transmit_header(repeat_count, repeat_offset):
repeat = ((repeat_offset << 15) | repeat_count) if repeat_count else 0

View File

@ -1,10 +1,8 @@
#!/usr/bin/env python
from common import create_serial, create_interface
with create_serial() as serial:
interface = create_interface(serial, reset=False, poll_flush=False)
from common import open_example_serial_interface
with open_example_serial_interface(reset=False, poll_flush=False) as interface:
print('Resetting interface...')
interface.reset()

View File

@ -1,12 +1,10 @@
#!/usr/bin/env python
from common import create_serial, create_interface
from common import open_example_serial_interface
from coax import poll, poll_ack
with create_serial() as serial:
interface = create_interface(serial, poll_flush=False)
with open_example_serial_interface(poll_flush=False) as interface:
print('POLL...')
poll_response = poll(interface, receive_timeout=5)

View File

@ -1,12 +1,10 @@
#!/usr/bin/env python
from common import create_serial, create_interface
from common import open_example_serial_interface
from coax import read_terminal_id
with create_serial() as serial:
interface = create_interface(serial)
with open_example_serial_interface() as interface:
print('READ_TERMINAL_ID...')
terminal_id = read_terminal_id(interface)

View File

@ -1,12 +1,10 @@
#!/usr/bin/env python
from common import create_serial, create_interface
from common import open_example_serial_interface
from coax import read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data
with create_serial() as serial:
interface = create_interface(serial)
with open_example_serial_interface() as interface:
print('LOAD_ADDRESS_COUNTER_HI...')
load_address_counter_hi(interface, 0)

View File

@ -1,12 +1,10 @@
#!/usr/bin/env python
from common import create_serial, create_interface
from common import open_example_serial_interface
from coax import read_status
with create_serial() as serial:
interface = create_interface(serial)
with open_example_serial_interface() as interface:
print('READ_STATUS...')
status = read_status(interface)

View File

@ -1,12 +1,10 @@
#!/usr/bin/env python
from common import create_serial, create_interface
from common import open_example_serial_interface
from coax import Control, poll, load_address_counter_hi, load_address_counter_lo, write_data, load_control_register
with create_serial() as serial:
interface = create_interface(serial)
with open_example_serial_interface() as interface:
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)

View File

@ -1,12 +1,10 @@
#!/usr/bin/env python
from common import create_serial, create_interface
from common import open_example_serial_interface
from coax import read_address_counter_hi, read_address_counter_lo, read_data, load_address_counter_hi, load_address_counter_lo, write_data
with create_serial() as serial:
interface = create_interface(serial)
with open_example_serial_interface() as interface:
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)

View File

@ -1,12 +1,10 @@
#!/usr/bin/env python
from common import create_serial, create_interface
from common import open_example_serial_interface
from coax import SecondaryControl, read_address_counter_hi, read_address_counter_lo, read_multiple, load_address_counter_hi, load_address_counter_lo, load_secondary_control, write_data
with create_serial() as serial:
interface = create_interface(serial)
with open_example_serial_interface() as interface:
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)

View File

@ -1,12 +1,10 @@
#!/usr/bin/env python
from common import create_serial, create_interface
from common import open_example_serial_interface
from coax import reset
with create_serial() as serial:
interface = create_interface(serial)
with open_example_serial_interface() as interface:
print('RESET...')
reset(interface)

View File

@ -1,12 +1,10 @@
#!/usr/bin/env python
from common import create_serial, create_interface
from common import open_example_serial_interface
from coax import read_address_counter_hi, read_address_counter_lo, read_status, load_address_counter_hi, load_address_counter_lo, write_data, load_mask, search_forward, search_backward
with create_serial() as serial:
interface = create_interface(serial)
with open_example_serial_interface() as interface:
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)

View File

@ -1,12 +1,10 @@
#!/usr/bin/env python
from common import create_serial, create_interface
from common import open_example_serial_interface
from coax import read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data, insert_byte
with create_serial() as serial:
interface = create_interface(serial)
with open_example_serial_interface() as interface:
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)

View File

@ -1,12 +1,10 @@
#!/usr/bin/env python
from common import create_serial, create_interface
from common import open_example_serial_interface
from coax import read_address_counter_hi, read_address_counter_lo, read_status, load_address_counter_hi, load_address_counter_lo, write_data, load_mask, clear
with create_serial() as serial:
interface = create_interface(serial)
with open_example_serial_interface() as interface:
load_address_counter_hi(interface, 0)
load_address_counter_lo(interface, 80)

View File

@ -1,14 +1,12 @@
#!/usr/bin/env python
from common import create_serial, create_interface
from common import open_example_serial_interface
from coax import Control, read_address_counter_hi, read_address_counter_lo, load_address_counter_hi, load_address_counter_lo, write_data, load_control_register
DIGIT_MAP = [0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x80, 0x81, 0x82, 0x83, 0x84, 0x85]
with create_serial() as serial:
interface = create_interface(serial)
with open_example_serial_interface() as interface:
load_control_register(interface, Control(cursor_inhibit=True))
load_address_counter_hi(interface, 0)

View File

@ -1,14 +1,10 @@
#!/usr/bin/env python
import sys
from common import create_serial, create_interface
from common import open_example_serial_interface
from coax import get_features
with create_serial() as serial:
interface = create_interface(serial)
with open_example_serial_interface() as interface:
features = get_features(interface)
print(features)

View File

@ -3,16 +3,14 @@
import sys
from itertools import chain
from common import create_serial, create_interface
from common import open_example_serial_interface
from coax import Feature, get_features, load_address_counter_hi, load_address_counter_lo, write_data, eab_write_alternate, eab_load_mask
def eab_alternate_zip(regen_buffer, eab_buffer):
return bytes(chain(*zip(regen_buffer, eab_buffer)))
with create_serial() as serial:
interface = create_interface(serial)
with open_example_serial_interface() as interface:
features = get_features(interface)
if Feature.EAB not in features:

View File

@ -1,53 +1,43 @@
import sys
import time
import os
from serial import Serial
from contextlib import contextmanager
sys.path.append('..')
from coax import SerialInterface, poll, poll_ack
from coax import open_serial_interface, poll, poll_ack
DEFAULT_SERIAL_PORT = '/dev/ttyACM0'
def create_serial():
port = os.environ.get('COAX_PORT', DEFAULT_SERIAL_PORT)
@contextmanager
def open_example_serial_interface(reset=True, poll_flush=True):
serial_port = os.environ.get('COAX_PORT', DEFAULT_SERIAL_PORT)
print(f'Opening {port}...')
print(f'Opening {serial_port}...')
serial = Serial(port, 115200)
with open_serial_interface(serial_port, reset=False) as interface:
if reset:
print('Resetting interface...')
return serial
interface.reset()
def create_interface(serial, reset=True, poll_flush=True):
if 'COAX_FAST_START' not in os.environ:
print('Sleeping to allow interface time to wake up...')
if interface.legacy_firmware_detected:
print(f'Firmware version is {interface.legacy_firmware_version}')
time.sleep(3)
if poll_flush:
print('POLLing...')
interface = SerialInterface(serial)
if reset:
print('Resetting interface...')
interface.reset()
if interface.legacy_firmware_detected:
print(f'Firmware version is {interface.legacy_firmware_version}')
if poll_flush:
print('POLLing...')
count = 0
poll_response = poll(interface, receive_timeout=1)
while poll_response:
poll_ack(interface)
count += 1
count = 0
poll_response = poll(interface, receive_timeout=1)
print(f'ACK\'d {count} POLL responses')
while poll_response:
poll_ack(interface)
return interface
count += 1
poll_response = poll(interface, receive_timeout=1)
print(f'ACK\'d {count} POLL responses')
yield interface