mirror of
https://github.com/lowobservable/oec.git
synced 2026-01-11 23:53:04 +00:00
Add TN3270 character encoding option, changed default to IBM037
This commit is contained in:
parent
f12625732c
commit
8c14b100c2
@ -1,5 +1,6 @@
|
||||
import os
|
||||
import signal
|
||||
import codecs
|
||||
import logging
|
||||
import argparse
|
||||
from coax import open_serial_interface, TerminalType
|
||||
@ -38,6 +39,14 @@ def _get_keymap(terminal_id, extended_id):
|
||||
|
||||
return keymap
|
||||
|
||||
def _get_character_encoding(encoding):
|
||||
try:
|
||||
codecs.lookup(encoding)
|
||||
except LookupError:
|
||||
raise argparse.ArgumentTypeError(f'invalid encoding: {encoding}')
|
||||
|
||||
return encoding
|
||||
|
||||
def _create_device(args, interface, device_address, poll_response):
|
||||
# Read the terminal identifiers.
|
||||
(terminal_id, extended_id) = get_ids(interface, device_address)
|
||||
@ -64,7 +73,7 @@ def _create_device(args, interface, device_address, poll_response):
|
||||
|
||||
def _create_session(args, device):
|
||||
if args.emulator == 'tn3270':
|
||||
return TN3270Session(device, args.host, args.port)
|
||||
return TN3270Session(device, args.host, args.port, args.character_encoding)
|
||||
|
||||
if args.emulator == 'vt100' and IS_VT100_AVAILABLE:
|
||||
host_command = [args.command, *args.command_args]
|
||||
@ -102,6 +111,9 @@ def main():
|
||||
tn3270_parser.add_argument('host', help='Hostname')
|
||||
tn3270_parser.add_argument('port', nargs='?', default=23, type=int)
|
||||
|
||||
tn3270_parser.add_argument('--codepage', metavar='encoding', default='ibm037',
|
||||
dest='character_encoding', type=_get_character_encoding)
|
||||
|
||||
if IS_VT100_AVAILABLE:
|
||||
vt100_parser = subparsers.add_parser('vt100', description='VT100 emulator',
|
||||
help='VT100 emulator')
|
||||
|
||||
@ -317,7 +317,7 @@ class BufferedDisplay(Display):
|
||||
# TODO: Implement multiple ranges with optimization.
|
||||
return [(self.dirty[0], self.dirty[-1])]
|
||||
|
||||
_ASCII_CHAR_MAP = {
|
||||
CHAR_MAP = {
|
||||
'>': 0x08,
|
||||
'<': 0x09,
|
||||
'[': 0x0a,
|
||||
@ -447,27 +447,10 @@ _ASCII_CHAR_MAP = {
|
||||
'*': 0xbf
|
||||
}
|
||||
|
||||
_EBCDIC_CHAR_MAP = {ascii_character.encode('cp500')[0]: byte for ascii_character, byte in _ASCII_CHAR_MAP.items()}
|
||||
def encode_character(character):
|
||||
"""Map a character to a terminal display character."""
|
||||
return CHAR_MAP.get(character, 0x00)
|
||||
|
||||
ASCII_CHAR_MAP = [_ASCII_CHAR_MAP.get(character, 0x00) for character in map(chr, range(256))]
|
||||
|
||||
EBCDIC_CHAR_MAP = [_EBCDIC_CHAR_MAP.get(character, 0x00) for character in range(256)]
|
||||
|
||||
def encode_ascii_character(character):
|
||||
"""Map an ASCII character to a terminal display character."""
|
||||
if character > 255:
|
||||
return 0x00
|
||||
|
||||
return ASCII_CHAR_MAP[character]
|
||||
|
||||
def encode_ebcdic_character(character):
|
||||
"""Map an EBCDIC character to a terminal display character."""
|
||||
if character > 255:
|
||||
return 0x00
|
||||
|
||||
return EBCDIC_CHAR_MAP[character]
|
||||
|
||||
def encode_string(string, errors='replace'):
|
||||
def encode_string(string):
|
||||
"""Map a string to terminal display characters."""
|
||||
return bytes([encode_ascii_character(character) for character
|
||||
in string.encode('ascii', errors)])
|
||||
return bytes(map(encode_character, string))
|
||||
|
||||
@ -365,8 +365,8 @@ class Keyboard:
|
||||
|
||||
return (False, None)
|
||||
|
||||
def get_ascii_character_for_key(key):
|
||||
"""Map a key to ASCII character."""
|
||||
def get_character_for_key(key):
|
||||
"""Map a key to a character."""
|
||||
if not key:
|
||||
return None
|
||||
|
||||
@ -376,12 +376,3 @@ def get_ascii_character_for_key(key):
|
||||
return None
|
||||
|
||||
return chr(value)
|
||||
|
||||
def get_ebcdic_character_for_key(key):
|
||||
"""Map a key to EBCDIC character."""
|
||||
ascii_character = get_ascii_character_for_key(key)
|
||||
|
||||
if not ascii_character:
|
||||
return None
|
||||
|
||||
return ascii_character.encode('cp500')[0]
|
||||
|
||||
@ -9,8 +9,8 @@ from tn3270 import Telnet, Emulator, AttributeCell, CharacterCell, AID, Color, H
|
||||
from tn3270.ebcdic import DUP, FM
|
||||
|
||||
from .session import Session, SessionDisconnectedError
|
||||
from .display import encode_ascii_character, encode_ebcdic_character, encode_string
|
||||
from .keyboard import Key, get_ebcdic_character_for_key
|
||||
from .display import encode_character, encode_string
|
||||
from .keyboard import Key, get_character_for_key
|
||||
|
||||
AID_KEY_MAP = {
|
||||
Key.CLEAR: AID.CLEAR,
|
||||
@ -47,13 +47,14 @@ AID_KEY_MAP = {
|
||||
class TN3270Session(Session):
|
||||
"""TN3270 session."""
|
||||
|
||||
def __init__(self, terminal, host, port):
|
||||
def __init__(self, terminal, host, port, character_encoding):
|
||||
super().__init__(terminal)
|
||||
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.character_encoding = character_encoding
|
||||
|
||||
self.telnet = None
|
||||
self.emulator = None
|
||||
@ -144,9 +145,11 @@ class TN3270Session(Session):
|
||||
elif key == Key.FIELD_MARK:
|
||||
self.emulator.field_mark()
|
||||
else:
|
||||
byte = get_ebcdic_character_for_key(key)
|
||||
character = get_character_for_key(key)
|
||||
|
||||
if character:
|
||||
byte = character.encode(self.character_encoding)[0]
|
||||
|
||||
if byte:
|
||||
self.emulator.input(byte, self.keyboard_insert)
|
||||
except OperatorError as error:
|
||||
self.operator_error = error
|
||||
@ -193,7 +196,7 @@ class TN3270Session(Session):
|
||||
for address in self.emulator.dirty:
|
||||
cell = self.emulator.cells[address]
|
||||
|
||||
(regen_byte, eab_byte) = _map_cell(cell, has_eab)
|
||||
(regen_byte, eab_byte) = _map_cell(cell, self.character_encoding, has_eab)
|
||||
|
||||
self.terminal.display.buffered_write_byte(regen_byte, eab_byte, index=address)
|
||||
|
||||
@ -234,7 +237,7 @@ class TN3270Session(Session):
|
||||
|
||||
return message_area.ljust(9, b'\x00')
|
||||
|
||||
def _map_cell(cell, has_eab):
|
||||
def _map_cell(cell, character_encoding, has_eab):
|
||||
regen_byte = 0x00
|
||||
|
||||
if isinstance(cell, AttributeCell):
|
||||
@ -245,13 +248,15 @@ def _map_cell(cell, has_eab):
|
||||
|
||||
if cell.character_set is not None:
|
||||
# TODO: Temporary workaround until character set support is added.
|
||||
regen_byte = encode_ascii_character(ord('ß'))
|
||||
regen_byte = encode_character('ß')
|
||||
elif byte == DUP:
|
||||
regen_byte = encode_ascii_character(ord('*'))
|
||||
regen_byte = encode_character('*')
|
||||
elif byte == FM:
|
||||
regen_byte = encode_ascii_character(ord(';'))
|
||||
regen_byte = encode_character(';')
|
||||
else:
|
||||
regen_byte = encode_ebcdic_character(byte)
|
||||
character = bytes([byte]).decode(character_encoding)
|
||||
|
||||
regen_byte = encode_character(character)
|
||||
|
||||
if not has_eab:
|
||||
return (regen_byte, None)
|
||||
|
||||
@ -9,8 +9,8 @@ from ptyprocess import PtyProcess
|
||||
import pyte
|
||||
|
||||
from .session import Session, SessionDisconnectedError
|
||||
from .display import encode_ascii_character
|
||||
from .keyboard import Key, get_ascii_character_for_key, MODIFIER_KEYS
|
||||
from .display import encode_character
|
||||
from .keyboard import Key, get_character_for_key, MODIFIER_KEYS
|
||||
|
||||
VT100_KEY_MAP = {
|
||||
Key.NOT: b'^',
|
||||
@ -157,7 +157,7 @@ class VT100Session(Session):
|
||||
if bytes_ is not None:
|
||||
return bytes_
|
||||
|
||||
character = get_ascii_character_for_key(key)
|
||||
character = get_character_for_key(key)
|
||||
|
||||
if character and character.isprintable():
|
||||
return character.encode()
|
||||
@ -193,7 +193,7 @@ class VT100Session(Session):
|
||||
character = row_buffer[column]
|
||||
|
||||
# TODO: Investigate multi-byte or zero-byte cases further.
|
||||
regen_byte = encode_ascii_character(ord(character.data)) if len(character.data) == 1 else 0x00
|
||||
regen_byte = encode_character(character.data) if len(character.data) == 1 else 0x00
|
||||
eab_byte = 0x00 if has_eab else None
|
||||
|
||||
self.terminal.display.buffered_write_byte(regen_byte, eab_byte, row=row, column=column)
|
||||
|
||||
@ -8,7 +8,7 @@ import context
|
||||
|
||||
from oec.interface import InterfaceWrapper
|
||||
from oec.terminal import Terminal
|
||||
from oec.display import Display, Dimensions, StatusLine, BufferedDisplay, encode_ascii_character, encode_ebcdic_character, encode_string
|
||||
from oec.display import Display, Dimensions, StatusLine, BufferedDisplay, encode_character, encode_string
|
||||
from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2
|
||||
|
||||
from mock_interface import MockInterface
|
||||
@ -812,32 +812,22 @@ class BufferedDisplayWriteTestCase(unittest.TestCase):
|
||||
|
||||
self.assertSequenceEqual(self.buffered_display.dirty, [80])
|
||||
|
||||
class EncodeAsciiCharacterTestCase(unittest.TestCase):
|
||||
class EncodeCharacterTestCase(unittest.TestCase):
|
||||
def test_mapped_character(self):
|
||||
self.assertEqual(encode_ascii_character(ord('a')), 0x80)
|
||||
self.assertEqual(encode_character('a'), 0x80)
|
||||
|
||||
def test_unmapped_character(self):
|
||||
self.assertEqual(encode_ascii_character(ord('`')), 0x00)
|
||||
self.assertEqual(encode_character('`'), 0x00)
|
||||
|
||||
def test_out_of_range(self):
|
||||
self.assertEqual(encode_ascii_character(ord('✓')), 0x00)
|
||||
|
||||
class EncodeEbcdicCharacterTestCase(unittest.TestCase):
|
||||
def test_mapped_character(self):
|
||||
self.assertEqual(encode_ebcdic_character(129), 0x80)
|
||||
|
||||
def test_unmapped_character(self):
|
||||
self.assertEqual(encode_ebcdic_character(185), 0x00)
|
||||
|
||||
def test_out_of_range(self):
|
||||
self.assertEqual(encode_ebcdic_character(256), 0x00)
|
||||
self.assertEqual(encode_character('✓'), 0x00)
|
||||
|
||||
class EncodeStringTestCase(unittest.TestCase):
|
||||
def test_mapped_characters(self):
|
||||
self.assertEqual(encode_string('Hello, world!'), bytes.fromhex('a7 84 8b 8b 8e 33 00 96 8e 91 8b 83 19'))
|
||||
|
||||
def test_unmapped_characters(self):
|
||||
self.assertEqual(encode_string('Everything ✓'), bytes.fromhex('a4 95 84 91 98 93 87 88 8d 86 00 18'))
|
||||
self.assertEqual(encode_string('Everything ✓'), bytes.fromhex('a4 95 84 91 98 93 87 88 8d 86 00 00'))
|
||||
|
||||
def _create_display(interface):
|
||||
terminal_id = TerminalId(0b11110100)
|
||||
|
||||
@ -2,7 +2,7 @@ import unittest
|
||||
|
||||
import context
|
||||
|
||||
from oec.keyboard import KeyboardModifiers, Key, Keymap, Keyboard, get_ascii_character_for_key, get_ebcdic_character_for_key
|
||||
from oec.keyboard import KeyboardModifiers, Key, Keymap, Keyboard, get_character_for_key
|
||||
from oec.keymap_3278_2 import KEYMAP as KEYMAP_3278_2
|
||||
from oec.keymap_3483 import KEYMAP as KEYMAP_3483
|
||||
|
||||
@ -200,22 +200,12 @@ class KeyboardGetKeyMultipleModifierReleaseTestCase(unittest.TestCase):
|
||||
def _assert_get_key(self, scan_code, key, modifiers, modifiers_changed):
|
||||
self.assertEqual(self.keyboard.get_key(scan_code), (key, modifiers, modifiers_changed))
|
||||
|
||||
class GetAsciiCharacterForKeyTestCase(unittest.TestCase):
|
||||
class GetCharacterForKeyTestCase(unittest.TestCase):
|
||||
def test_none(self):
|
||||
self.assertIsNone(get_ascii_character_for_key(None))
|
||||
self.assertIsNone(get_character_for_key(None))
|
||||
|
||||
def test_no_mapping(self):
|
||||
self.assertIsNone(get_ascii_character_for_key(Key.ATTN))
|
||||
self.assertIsNone(get_character_for_key(Key.ATTN))
|
||||
|
||||
def test_mapping(self):
|
||||
self.assertEqual(get_ascii_character_for_key(Key.UPPER_A), 'A')
|
||||
|
||||
class GetEbcdicCharacterForKeyTestCase(unittest.TestCase):
|
||||
def test_none(self):
|
||||
self.assertIsNone(get_ebcdic_character_for_key(None))
|
||||
|
||||
def test_no_mapping(self):
|
||||
self.assertIsNone(get_ebcdic_character_for_key(Key.ATTN))
|
||||
|
||||
def test_mapping(self):
|
||||
self.assertEqual(get_ebcdic_character_for_key(Key.UPPER_A), 193)
|
||||
self.assertEqual(get_character_for_key(Key.UPPER_A), 'A')
|
||||
|
||||
@ -24,7 +24,7 @@ class SessionHandleHostTestCase(unittest.TestCase):
|
||||
|
||||
self.terminal = _create_terminal(self.interface)
|
||||
|
||||
self.session = TN3270Session(self.terminal, 'mainframe', 23)
|
||||
self.session = TN3270Session(self.terminal, 'mainframe', 23, 'ibm037')
|
||||
|
||||
self.telnet = create_autospec(Telnet, instance=True)
|
||||
|
||||
@ -71,7 +71,7 @@ class SessionHandleKeyTestCase(unittest.TestCase):
|
||||
|
||||
self.terminal = _create_terminal(self.interface)
|
||||
|
||||
self.session = TN3270Session(self.terminal, 'mainframe', 23)
|
||||
self.session = TN3270Session(self.terminal, 'mainframe', 23, 'ibm037')
|
||||
|
||||
self.session.emulator = create_autospec(Emulator, instance=True)
|
||||
|
||||
@ -226,7 +226,7 @@ class SessionRenderTestCase(unittest.TestCase):
|
||||
self.terminal.display.flush = Mock(wraps=self.terminal.display.flush)
|
||||
self.terminal.display.status_line.write = Mock(wraps=self.terminal.display.status_line.write)
|
||||
|
||||
self.session = TN3270Session(self.terminal, 'mainframe', 23)
|
||||
self.session = TN3270Session(self.terminal, 'mainframe', 23, 'ibm037')
|
||||
|
||||
self.session.telnet = create_autospec(Telnet, instance=True)
|
||||
self.session.emulator = create_autospec(Emulator, instance=True)
|
||||
@ -238,20 +238,20 @@ class SessionRenderTestCase(unittest.TestCase):
|
||||
cells = _create_screen_cells(24, 80)
|
||||
|
||||
_set_attribute(cells, 0, protected=True)
|
||||
_set_characters(cells, 1, 'PROTECTED'.encode('cp500'))
|
||||
_set_characters(cells, 1, 'PROTECTED'.encode('ibm037'))
|
||||
_set_attribute(cells, 10, protected=True, intensified=True)
|
||||
_set_characters(cells, 11, 'PROTECTED INTENSIFIED'.encode('cp500'))
|
||||
_set_characters(cells, 11, 'PROTECTED INTENSIFIED'.encode('ibm037'))
|
||||
_set_attribute(cells, 32, protected=True, hidden=True)
|
||||
_set_characters(cells, 33, 'PROTECTED HIDDEN'.encode('cp500'))
|
||||
_set_characters(cells, 33, 'PROTECTED HIDDEN'.encode('ibm037'))
|
||||
_set_attribute(cells, 49, protected=False)
|
||||
_set_characters(cells, 50, 'UNPROTECTED'.encode('cp500'))
|
||||
_set_characters(cells, 50, 'UNPROTECTED'.encode('ibm037'))
|
||||
_set_attribute(cells, 61, protected=False, intensified=True)
|
||||
_set_characters(cells, 62, 'UNPROTECTED INTENSIFIED'.encode('cp500'))
|
||||
_set_characters(cells, 62, 'UNPROTECTED INTENSIFIED'.encode('ibm037'))
|
||||
_set_attribute(cells, 85, protected=False, hidden=True)
|
||||
_set_characters(cells, 86, 'UNPROTECTED HIDDEN'.encode('cp500'))
|
||||
_set_characters(cells, 86, 'UNPROTECTED HIDDEN'.encode('ibm037'))
|
||||
_set_attribute(cells, 104, protected=True)
|
||||
_set_formatting(cells, 104, color=Color.YELLOW)
|
||||
_set_characters(cells, 105, 'EAB'.encode('cp500'))
|
||||
_set_characters(cells, 105, 'EAB'.encode('ibm037'))
|
||||
_set_formatting(cells, 105, blink=True)
|
||||
_set_formatting(cells, 106, reverse=True)
|
||||
_set_formatting(cells, 107, underscore=True)
|
||||
@ -287,20 +287,20 @@ class SessionRenderTestCase(unittest.TestCase):
|
||||
cells = _create_screen_cells(24, 80)
|
||||
|
||||
_set_attribute(cells, 0, protected=True)
|
||||
_set_characters(cells, 1, 'PROTECTED'.encode('cp500'))
|
||||
_set_characters(cells, 1, 'PROTECTED'.encode('ibm037'))
|
||||
_set_attribute(cells, 10, protected=True, intensified=True)
|
||||
_set_characters(cells, 11, 'PROTECTED INTENSIFIED'.encode('cp500'))
|
||||
_set_characters(cells, 11, 'PROTECTED INTENSIFIED'.encode('ibm037'))
|
||||
_set_attribute(cells, 32, protected=True, hidden=True)
|
||||
_set_characters(cells, 33, 'PROTECTED HIDDEN'.encode('cp500'))
|
||||
_set_characters(cells, 33, 'PROTECTED HIDDEN'.encode('ibm037'))
|
||||
_set_attribute(cells, 49, protected=False)
|
||||
_set_characters(cells, 50, 'UNPROTECTED'.encode('cp500'))
|
||||
_set_characters(cells, 50, 'UNPROTECTED'.encode('ibm037'))
|
||||
_set_attribute(cells, 61, protected=False, intensified=True)
|
||||
_set_characters(cells, 62, 'UNPROTECTED INTENSIFIED'.encode('cp500'))
|
||||
_set_characters(cells, 62, 'UNPROTECTED INTENSIFIED'.encode('ibm037'))
|
||||
_set_attribute(cells, 85, protected=False, hidden=True)
|
||||
_set_characters(cells, 86, 'UNPROTECTED HIDDEN'.encode('cp500'))
|
||||
_set_characters(cells, 86, 'UNPROTECTED HIDDEN'.encode('ibm037'))
|
||||
_set_attribute(cells, 104, protected=True)
|
||||
_set_formatting(cells, 104, color=Color.YELLOW)
|
||||
_set_characters(cells, 105, 'EAB'.encode('cp500'))
|
||||
_set_characters(cells, 105, 'EAB'.encode('ibm037'))
|
||||
_set_formatting(cells, 105, blink=True)
|
||||
_set_formatting(cells, 106, reverse=True)
|
||||
_set_formatting(cells, 107, underscore=True)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user