Source code for pyqterm.backend

# -*- coding: utf-8 -*-
# This code is based on AjaxTerm/Web-Shell which included a fairly complete
# vt100 implementation as well as a stable process multiplexer.
# I made some small fixes, improved some small parts and added a Session class
# which can be used by the widget.
# License: GPL2
import sys
import os
import fcntl
import array
import threading
import time
import termios
import pty
import signal
import struct
import select
import subprocess


__version__ = "0.1"


[docs]class Terminal(object): def __init__(self, w, h): self.w = w self.h = h self.vt100_charset_graph = [ 0x25ca, 0x2026, 0x2022, 0x3f, 0xb6, 0x3f, 0xb0, 0xb1, 0x3f, 0x3f, 0x2b, 0x2b, 0x2b, 0x2b, 0x2b, 0xaf, 0x2014, 0x2014, 0x2014, 0x5f, 0x2b, 0x2b, 0x2b, 0x2b, 0x7c, 0x2264, 0x2265, 0xb6, 0x2260, 0xa3, 0xb7, 0x7f ] self.vt100_esc = { '#8': self.esc_DECALN, '(A': self.esc_G0_0, '(B': self.esc_G0_1, '(0': self.esc_G0_2, '(1': self.esc_G0_3, '(2': self.esc_G0_4, ')A': self.esc_G1_0, ')B': self.esc_G1_1, ')0': self.esc_G1_2, ')1': self.esc_G1_3, ')2': self.esc_G1_4, '7': self.esc_DECSC, '8': self.esc_DECRC, '=': self.esc_DECKPAM, '>': self.esc_DECKPNM, 'D': self.esc_IND, 'E': self.esc_NEL, 'H': self.esc_HTS, 'M': self.esc_RI, 'N': self.esc_SS2, 'O': self.esc_SS3, 'P': self.esc_DCS, 'X': self.esc_SOS, 'Z': self.esc_DECID, '[': self.esc_CSI, '\\': self.esc_ST, ']': self.esc_OSC, '^': self.esc_PM, '_': self.esc_APC, 'c': self.reset_hard, } self.vt100_csi = { '@': self.csi_ICH, 'A': self.csi_CUU, 'B': self.csi_CUD, 'C': self.csi_CUF, 'D': self.csi_CUB, 'E': self.csi_CNL, 'F': self.csi_CPL, 'G': self.csi_CHA, 'H': self.csi_CUP, 'I': self.csi_CHT, 'J': self.csi_ED, 'K': self.csi_EL, 'L': self.csi_IL, 'M': self.csi_DL, 'P': self.csi_DCH, 'S': self.csi_SU, 'T': self.csi_SD, 'W': self.csi_CTC, 'X': self.csi_ECH, 'Z': self.csi_CBT, '`': self.csi_HPA, 'a': self.csi_HPR, 'b': self.csi_REP, 'c': self.csi_DA, 'd': self.csi_VPA, 'e': self.csi_VPR, 'f': self.csi_HVP, 'g': self.csi_TBC, 'h': self.csi_SM, 'l': self.csi_RM, 'm': self.csi_SGR, 'n': self.csi_DSR, 'r': self.csi_DECSTBM, 's': self.csi_SCP, 'u': self.csi_RCP, 'x': self.csi_DECREQTPARM, '!p': self.csi_DECSTR, } self.vt100_keyfilter_ansikeys = { '~': '~', 'A': '\x1b[A', 'B': '\x1b[B', 'C': '\x1b[C', 'D': '\x1b[D', 'F': '\x1b[F', 'H': '\x1b[H', '1': '\x1b[5~', '2': '\x1b[6~', '3': '\x1b[2~', '4': '\x1b[3~', 'a': '\x1bOP', 'b': '\x1bOQ', 'c': '\x1bOR', 'd': '\x1bOS', 'e': '\x1b[15~', 'f': '\x1b[17~', 'g': '\x1b[18~', 'h': '\x1b[19~', 'i': '\x1b[20~', 'j': '\x1b[21~', 'k': '\x1b[23~', 'l': '\x1b[24~', } self.vt100_keyfilter_appkeys = { '~': '~', 'A': '\x1bOA', 'B': '\x1bOB', 'C': '\x1bOC', 'D': '\x1bOD', 'F': '\x1bOF', 'H': '\x1bOH', '1': '\x1b[5~', '2': '\x1b[6~', '3': '\x1b[2~', '4': '\x1b[3~', 'a': '\x1bOP', 'b': '\x1bOQ', 'c': '\x1bOR', 'd': '\x1bOS', 'e': '\x1b[15~', 'f': '\x1b[17~', 'g': '\x1b[18~', 'h': '\x1b[19~', 'i': '\x1b[20~', 'j': '\x1b[21~', 'k': '\x1b[23~', 'l': '\x1b[24~', } self.reset_hard() # Reset functions
[docs] def reset_hard(self): # Attribute mask: 0x0XFB0000 # X: Bit 0 - Underlined # Bit 1 - Negative # Bit 2 - Concealed # F: Foreground # B: Background self.attr = 0x00fe0000 # UTF-8 decoder self.utf8_units_count = 0 self.utf8_units_received = 0 self.utf8_char = 0 # Key filter self.vt100_keyfilter_escape = False # Last char self.vt100_lastchar = 0 # Control sequences self.vt100_parse_len = 0 self.vt100_parse_state = "" self.vt100_parse_func = "" self.vt100_parse_param = "" # Buffers self.vt100_out = "" # Invoke other resets self.reset_screen() self.reset_soft()
[docs] def reset_soft(self): # Attribute mask: 0x0XFB0000 # X: Bit 0 - Underlined # Bit 1 - Negative # Bit 2 - Concealed # F: Foreground # B: Background self.attr = 0x00fe0000 # Scroll parameters self.scroll_area_y0 = 0 self.scroll_area_y1 = self.h # Character sets self.vt100_charset_is_single_shift = False self.vt100_charset_is_graphical = False self.vt100_charset_g_sel = 0 self.vt100_charset_g = [0, 0] # Modes self.vt100_mode_insert = False self.vt100_mode_lfnewline = False self.vt100_mode_cursorkey = False self.vt100_mode_column_switch = False self.vt100_mode_inverse = False self.vt100_mode_origin = False self.vt100_mode_autowrap = True self.vt100_mode_cursor = True self.vt100_mode_alt_screen = False self.vt100_mode_backspace = False # Init DECSC state self.esc_DECSC() self.vt100_saved2 = self.vt100_saved self.esc_DECSC()
[docs] def reset_screen(self): # Screen self.screen = array.array('i', [self.attr | 0x20] * self.w * self.h) self.screen2 = array.array('i', [self.attr | 0x20] * self.w * self.h) # Scroll parameters self.scroll_area_y0 = 0 self.scroll_area_y1 = self.h # Cursor position self.cx = 0 self.cy = 0 # Tab stops self.tab_stops = list(range(0, self.w, 8))
# UTF-8 functions
[docs] def utf8_decode(self, d): d = str(d, encoding="utf8") # py3 patch o = '' for c in d: char = ord(c) if self.utf8_units_count != self.utf8_units_received: self.utf8_units_received += 1 if (char & 0xc0) == 0x80: self.utf8_char = (self.utf8_char << 6) | (char & 0x3f) if self.utf8_units_count == self.utf8_units_received: if self.utf8_char < 0x10000: o += chr(self.utf8_char) self.utf8_units_count = self.utf8_units_received = 0 else: o += '?' while self.utf8_units_received: o += '?' self.utf8_units_received -= 1 self.utf8_units_count = 0 else: if (char & 0x80) == 0x00: o += c elif (char & 0xe0) == 0xc0: self.utf8_units_count = 1 self.utf8_char = char & 0x1f elif (char & 0xf0) == 0xe0: self.utf8_units_count = 2 self.utf8_char = char & 0x0f elif (char & 0xf8) == 0xf0: self.utf8_units_count = 3 self.utf8_char = char & 0x07 else: o += '?' return o
[docs] def utf8_charwidth(self, char): if char >= 0x2e80: return 2 else: return 1
# Low-level terminal functions
[docs] def peek(self, y0, x0, y1, x1): return self.screen[self.w * y0 + x0:self.w * (y1 - 1) + x1]
[docs] def poke(self, y, x, s): pos = self.w * y + x self.screen[pos:pos + len(s)] = s
[docs] def fill(self, y0, x0, y1, x1, char): n = self.w * (y1 - y0 - 1) + (x1 - x0) self.poke(y0, x0, array.array('i', [char] * n))
[docs] def clear(self, y0, x0, y1, x1): self.fill(y0, x0, y1, x1, self.attr | 0x20)
# Scrolling functions
[docs] def scroll_area_up(self, y0, y1, n=1): n = min(y1 - y0, n) self.poke(y0, 0, self.peek(y0 + n, 0, y1, self.w)) self.clear(y1 - n, 0, y1, self.w)
[docs] def scroll_area_down(self, y0, y1, n=1): n = min(y1 - y0, n) self.poke(y0 + n, 0, self.peek(y0, 0, y1 - n, self.w)) self.clear(y0, 0, y0 + n, self.w)
[docs] def scroll_area_set(self, y0, y1): y0 = max(0, min(self.h - 1, y0)) y1 = max(1, min(self.h, y1)) if y1 > y0: self.scroll_area_y0 = y0 self.scroll_area_y1 = y1
[docs] def scroll_line_right(self, y, x, n=1): if x < self.w: n = min(self.w - self.cx, n) self.poke(y, x + n, self.peek(y, x, y + 1, self.w - n)) self.clear(y, x, y + 1, x + n)
[docs] def scroll_line_left(self, y, x, n=1): if x < self.w: n = min(self.w - self.cx, n) self.poke(y, x, self.peek(y, x + n, y + 1, self.w)) self.clear(y, self.w - n, y + 1, self.w)
# Cursor functions
[docs] def cursor_line_width(self, next_char): wx = self.utf8_charwidth(next_char) lx = 0 for x in range(min(self.cx, self.w)): char = self.peek(self.cy, x, self.cy + 1, x + 1)[0] & 0xffff wx += self.utf8_charwidth(char) lx += 1 return wx, lx
[docs] def cursor_up(self, n=1): self.cy = max(self.scroll_area_y0, self.cy - n)
[docs] def cursor_down(self, n=1): self.cy = min(self.scroll_area_y1 - 1, self.cy + n)
[docs] def cursor_left(self, n=1): self.cx = max(0, self.cx - n)
[docs] def cursor_right(self, n=1): self.cx = min(self.w - 1, self.cx + n)
[docs] def cursor_set_x(self, x): self.cx = max(0, x)
[docs] def cursor_set_y(self, y): self.cy = max(0, min(self.h - 1, y))
[docs] def cursor_set(self, y, x): self.cursor_set_x(x) self.cursor_set_y(y)
# Dumb terminal
[docs] def ctrl_BS(self): delta_y, cx = divmod(self.cx - 1, self.w) cy = max(self.scroll_area_y0, self.cy + delta_y) self.cursor_set(cy, cx)
[docs] def ctrl_HT(self, n=1): if n > 0 and self.cx >= self.w: return if n <= 0 and self.cx == 0: return ts = 0 for i in range(len(self.tab_stops)): if self.cx >= self.tab_stops[i]: ts = i ts += n if ts < len(self.tab_stops) and ts >= 0: self.cursor_set_x(self.tab_stops[ts]) else: self.cursor_set_x(self.w - 1)
[docs] def ctrl_LF(self): if self.vt100_mode_lfnewline: self.ctrl_CR() if self.cy == self.scroll_area_y1 - 1: self.scroll_area_up(self.scroll_area_y0, self.scroll_area_y1) else: self.cursor_down()
[docs] def ctrl_CR(self): self.cursor_set_x(0)
[docs] def dumb_write(self, char): if char < 32: if char == 8: self.ctrl_BS() elif char == 9: self.ctrl_HT() elif char >= 10 and char <= 12: self.ctrl_LF() elif char == 13: self.ctrl_CR() return True return False
[docs] def dumb_echo(self, char): # Check right bound wx, cx = self.cursor_line_width(char) # Newline if wx > self.w: if self.vt100_mode_autowrap: self.ctrl_CR() self.ctrl_LF() else: self.cx = cx - 1 if self.vt100_mode_insert: self.scroll_line_right(self.cy, self.cx) if self.vt100_charset_is_single_shift: self.vt100_charset_is_single_shift = False elif self.vt100_charset_is_graphical and (char & 0xffe0) == 0x0060: char = self.vt100_charset_graph[char - 0x60] self.poke(self.cy, self.cx, array.array('i', [self.attr | char])) self.cursor_set_x(self.cx + 1)
# VT100 CTRL, ESC, CSI handlers
[docs] def vt100_charset_update(self): self.vt100_charset_is_graphical = ( self.vt100_charset_g[self.vt100_charset_g_sel] == 2)
[docs] def vt100_charset_set(self, g): # Invoke active character set self.vt100_charset_g_sel = g self.vt100_charset_update()
[docs] def vt100_charset_select(self, g, charset): # Select charset self.vt100_charset_g[g] = charset self.vt100_charset_update()
[docs] def vt100_setmode(self, p, state): # Set VT100 mode p = self.vt100_parse_params(p, [], False) for m in p: if m == '4': # Insertion replacement mode self.vt100_mode_insert = state elif m == '20': # Linefeed/new line mode self.vt100_mode_lfnewline = state elif m == '?1': # Cursor key mode self.vt100_mode_cursorkey = state elif m == '?3': # Column mode if self.vt100_mode_column_switch: if state: self.w = 132 else: self.w = 80 self.reset_screen() elif m == '?5': # Screen mode self.vt100_mode_inverse = state elif m == '?6': # Region origin mode self.vt100_mode_origin = state if state: self.cursor_set(self.scroll_area_y0, 0) else: self.cursor_set(0, 0) elif m == '?7': # Autowrap mode self.vt100_mode_autowrap = state elif m == '?25': # Text cursor enable mode self.vt100_mode_cursor = state elif m == '?40': # Column switch control self.vt100_mode_column_switch = state elif m == '?47': # Alternate screen mode if ((state and not self.vt100_mode_alt_screen) or (not state and self.vt100_mode_alt_screen)): self.screen, self.screen2 = self.screen2, self.screen self.vt100_saved, self.vt100_saved2 = self.vt100_saved2, self.vt100_saved self.vt100_mode_alt_screen = state elif m == '?67': # Backspace/delete self.vt100_mode_backspace = state
[docs] def ctrl_SO(self): # Shift out self.vt100_charset_set(1)
[docs] def ctrl_SI(self): # Shift in self.vt100_charset_set(0)
[docs] def esc_CSI(self): # CSI start sequence self.vt100_parse_reset('csi')
[docs] def esc_DECALN(self): # Screen alignment display self.fill(0, 0, self.h, self.w, 0x00fe0045)
[docs] def esc_G0_0(self): self.vt100_charset_select(0, 0)
[docs] def esc_G0_1(self): self.vt100_charset_select(0, 1)
[docs] def esc_G0_2(self): self.vt100_charset_select(0, 2)
[docs] def esc_G0_3(self): self.vt100_charset_select(0, 3)
[docs] def esc_G0_4(self): self.vt100_charset_select(0, 4)
[docs] def esc_G1_0(self): self.vt100_charset_select(1, 0)
[docs] def esc_G1_1(self): self.vt100_charset_select(1, 1)
[docs] def esc_G1_2(self): self.vt100_charset_select(1, 2)
[docs] def esc_G1_3(self): self.vt100_charset_select(1, 3)
[docs] def esc_G1_4(self): self.vt100_charset_select(1, 4)
[docs] def esc_DECSC(self): # Store cursor self.vt100_saved = {} self.vt100_saved['cx'] = self.cx self.vt100_saved['cy'] = self.cy self.vt100_saved['attr'] = self.attr self.vt100_saved['charset_g_sel'] = self.vt100_charset_g_sel self.vt100_saved['charset_g'] = self.vt100_charset_g[:] self.vt100_saved['mode_autowrap'] = self.vt100_mode_autowrap self.vt100_saved['mode_origin'] = self.vt100_mode_origin
[docs] def esc_DECRC(self): # Retore cursor self.cx = self.vt100_saved['cx'] self.cy = self.vt100_saved['cy'] self.attr = self.vt100_saved['attr'] self.vt100_charset_g_sel = self.vt100_saved['charset_g_sel'] self.vt100_charset_g = self.vt100_saved['charset_g'][:] self.vt100_charset_update() self.vt100_mode_autowrap = self.vt100_saved['mode_autowrap'] self.vt100_mode_origin = self.vt100_saved['mode_origin']
[docs] def esc_DECKPAM(self): # Application keypad mode pass
[docs] def esc_DECKPNM(self): # Numeric keypad mode pass
[docs] def esc_IND(self): # Index self.ctrl_LF()
[docs] def esc_NEL(self): # Next line self.ctrl_CR() self.ctrl_LF()
[docs] def esc_HTS(self): # Character tabulation set self.csi_CTC('0')
[docs] def esc_RI(self): # Reverse line feed if self.cy == self.scroll_area_y0: self.scroll_area_down(self.scroll_area_y0, self.scroll_area_y1) else: self.cursor_up()
[docs] def esc_SS2(self): # Single-shift two self.vt100_charset_is_single_shift = True
[docs] def esc_SS3(self): # Single-shift three self.vt100_charset_is_single_shift = True
[docs] def esc_DCS(self): # Device control string self.vt100_parse_reset('str')
[docs] def esc_SOS(self): # Start of string self.vt100_parse_reset('str')
[docs] def esc_DECID(self): # Identify terminal self.csi_DA('0')
[docs] def esc_ST(self): # String terminator pass
[docs] def esc_OSC(self): # Operating system command self.vt100_parse_reset('str')
[docs] def esc_PM(self): # Privacy message self.vt100_parse_reset('str')
[docs] def esc_APC(self): # Application program command self.vt100_parse_reset('str')
[docs] def csi_ICH(self, p): # Insert character p = self.vt100_parse_params(p, [1]) self.scroll_line_right(self.cy, self.cx, p[0])
[docs] def csi_CUU(self, p): # Cursor up p = self.vt100_parse_params(p, [1]) self.cursor_up(max(1, p[0]))
[docs] def csi_CUD(self, p): # Cursor down p = self.vt100_parse_params(p, [1]) self.cursor_down(max(1, p[0]))
[docs] def csi_CUF(self, p): # Cursor right p = self.vt100_parse_params(p, [1]) self.cursor_right(max(1, p[0]))
[docs] def csi_CUB(self, p): # Cursor left p = self.vt100_parse_params(p, [1]) self.cursor_left(max(1, p[0]))
[docs] def csi_CNL(self, p): # Cursor next line self.csi_CUD(p) self.ctrl_CR()
[docs] def csi_CPL(self, p): # Cursor preceding line self.csi_CUU(p) self.ctrl_CR()
[docs] def csi_CHA(self, p): # Cursor character absolute p = self.vt100_parse_params(p, [1]) self.cursor_set_x(p[0] - 1)
[docs] def csi_CUP(self, p): # Set cursor position p = self.vt100_parse_params(p, [1, 1]) if self.vt100_mode_origin: self.cursor_set(self.scroll_area_y0 + p[0] - 1, p[1] - 1) else: self.cursor_set(p[0] - 1, p[1] - 1)
[docs] def csi_CHT(self, p): # Cursor forward tabulation p = self.vt100_parse_params(p, [1]) self.ctrl_HT(max(1, p[0]))
[docs] def csi_ED(self, p): # Erase in display p = self.vt100_parse_params(p, ['0'], False) if p[0] == '0': self.clear(self.cy, self.cx, self.h, self.w) elif p[0] == '1': self.clear(0, 0, self.cy + 1, self.cx + 1) elif p[0] == '2': self.clear(0, 0, self.h, self.w)
[docs] def csi_EL(self, p): # Erase in line p = self.vt100_parse_params(p, ['0'], False) if p[0] == '0': self.clear(self.cy, self.cx, self.cy + 1, self.w) elif p[0] == '1': self.clear(self.cy, 0, self.cy + 1, self.cx + 1) elif p[0] == '2': self.clear(self.cy, 0, self.cy + 1, self.w)
[docs] def csi_IL(self, p): # Insert line p = self.vt100_parse_params(p, [1]) if (self.cy >= self.scroll_area_y0 and self.cy < self.scroll_area_y1): self.scroll_area_down(self.cy, self.scroll_area_y1, max(1, p[0]))
[docs] def csi_DL(self, p): # Delete line p = self.vt100_parse_params(p, [1]) if (self.cy >= self.scroll_area_y0 and self.cy < self.scroll_area_y1): self.scroll_area_up(self.cy, self.scroll_area_y1, max(1, p[0]))
[docs] def csi_DCH(self, p): # Delete characters p = self.vt100_parse_params(p, [1]) self.scroll_line_left(self.cy, self.cx, max(1, p[0]))
[docs] def csi_SU(self, p): # Scroll up p = self.vt100_parse_params(p, [1]) self.scroll_area_up( self.scroll_area_y0, self.scroll_area_y1, max(1, p[0]))
[docs] def csi_SD(self, p): # Scroll down p = self.vt100_parse_params(p, [1]) self.scroll_area_down( self.scroll_area_y0, self.scroll_area_y1, max(1, p[0]))
[docs] def csi_CTC(self, p): # Cursor tabulation control p = self.vt100_parse_params(p, ['0'], False) for m in p: if m == '0': try: ts = self.tab_stops.index(self.cx) except ValueError: tab_stops = self.tab_stops tab_stops.append(self.cx) tab_stops.sort() self.tab_stops = tab_stops elif m == '2': try: self.tab_stops.remove(self.cx) except ValueError: pass elif m == '5': self.tab_stops = [0]
[docs] def csi_ECH(self, p): # Erase character p = self.vt100_parse_params(p, [1]) n = min(self.w - self.cx, max(1, p[0])) self.clear(self.cy, self.cx, self.cy + 1, self.cx + n)
[docs] def csi_CBT(self, p): # Cursor backward tabulation p = self.vt100_parse_params(p, [1]) self.ctrl_HT(1 - max(1, p[0]))
[docs] def csi_HPA(self, p): # Character position absolute p = self.vt100_parse_params(p, [1]) self.cursor_set_x(p[0] - 1)
[docs] def csi_HPR(self, p): # Character position forward self.csi_CUF(p)
[docs] def csi_REP(self, p): # Repeat p = self.vt100_parse_params(p, [1]) if self.vt100_lastchar < 32: return n = min(2000, max(1, p[0])) while n: self.dumb_echo(self.vt100_lastchar) n -= 1 self.vt100_lastchar = 0
[docs] def csi_DA(self, p): # Device attributes p = self.vt100_parse_params(p, ['0'], False) if p[0] == '0': self.vt100_out = "\x1b[?1;2c" elif p[0] == '>0' or p[0] == '>': self.vt100_out = "\x1b[>0;184;0c"
[docs] def csi_VPA(self, p): # Line position absolute p = self.vt100_parse_params(p, [1]) self.cursor_set_y(p[0] - 1)
[docs] def csi_VPR(self, p): # Line position forward self.csi_CUD(p)
[docs] def csi_HVP(self, p): # Character and line position self.csi_CUP(p)
[docs] def csi_TBC(self, p): # Tabulation clear p = self.vt100_parse_params(p, ['0'], False) if p[0] == '0': self.csi_CTC('2') elif p[0] == '3': self.csi_CTC('5')
[docs] def csi_SM(self, p): # Set mode self.vt100_setmode(p, True)
[docs] def csi_RM(self, p): # Reset mode self.vt100_setmode(p, False)
[docs] def csi_SGR(self, p): # Select graphic rendition p = self.vt100_parse_params(p, [0]) for m in p: if m == 0: # Reset self.attr = 0x00fe0000 elif m == 4: # Underlined self.attr |= 0x01000000 elif m == 7: # Negative self.attr |= 0x02000000 elif m == 8: # Concealed self.attr |= 0x04000000 elif m == 24: # Not underlined self.attr &= 0x7eff0000 elif m == 27: # Positive self.attr &= 0x7dff0000 elif m == 28: # Revealed self.attr &= 0x7bff0000 elif m >= 30 and m <= 37: # Foreground self.attr = (self.attr & 0x7f0f0000) | ((m - 30) << 20) elif m == 39: # Default fg color self.attr = (self.attr & 0x7f0f0000) | 0x00f00000 elif m >= 40 and m <= 47: # Background self.attr = (self.attr & 0x7ff00000) | ((m - 40) << 16) elif m == 49: # Default bg color self.attr = (self.attr & 0x7ff00000) | 0x000e0000
[docs] def csi_DSR(self, p): # Device status report p = self.vt100_parse_params(p, ['0'], False) if p[0] == '5': self.vt100_out = "\x1b[0n" elif p[0] == '6': x = self.cx + 1 y = self.cy + 1 self.vt100_out = '\x1b[%d;%dR' % (y, x) elif p[0] == '7': self.vt100_out = 'WebShell' elif p[0] == '8': self.vt100_out = __version__ elif p[0] == '?6': x = self.cx + 1 y = self.cy + 1 self.vt100_out = '\x1b[?%d;%dR' % (y, x) elif p[0] == '?15': self.vt100_out = '\x1b[?13n' elif p[0] == '?25': self.vt100_out = '\x1b[?20n' elif p[0] == '?26': self.vt100_out = '\x1b[?27;1n' elif p[0] == '?53': self.vt100_out = '\x1b[?53n'
[docs] def csi_DECSTBM(self, p): # Set top and bottom margins p = self.vt100_parse_params(p, [1, self.h]) self.scroll_area_set(p[0] - 1, p[1]) if self.vt100_mode_origin: self.cursor_set(self.scroll_area_y0, 0) else: self.cursor_set(0, 0)
[docs] def csi_SCP(self, p): # Save cursor position self.vt100_saved_cx = self.cx self.vt100_saved_cy = self.cy
[docs] def csi_RCP(self, p): # Restore cursor position self.cx = self.vt100_saved_cx self.cy = self.vt100_saved_cy
[docs] def csi_DECREQTPARM(self, p): # Request terminal parameters p = self.vt100_parse_params(p, [], False) if p[0] == '0': self.vt100_out = "\x1b[2;1;1;112;112;1;0x" elif p[0] == '1': self.vt100_out = "\x1b[3;1;1;112;112;1;0x"
[docs] def csi_DECSTR(self, p): # Soft terminal reset self.reset_soft()
# VT100 Parser
[docs] def vt100_parse_params(self, p, d, to_int=True): # Process parameters (params p with defaults d) # Add prefix to all parameters prefix = '' if len(p) > 0: if p[0] >= '<' and p[0] <= '?': prefix = p[0] p = p[1:] p = p.split(';') else: p = '' # Process parameters n = max(len(p), len(d)) o = [] for i in range(n): value_def = False if i < len(p): value = prefix + p[i] value_def = True if to_int: try: value = int(value) except ValueError: value_def = False if (not value_def) and i < len(d): value = d[i] o.append(value) return o
[docs] def vt100_parse_reset(self, vt100_parse_state=""): self.vt100_parse_state = vt100_parse_state self.vt100_parse_len = 0 self.vt100_parse_func = "" self.vt100_parse_param = ""
[docs] def vt100_parse_process(self): if self.vt100_parse_state == 'esc': # ESC mode f = self.vt100_parse_func try: self.vt100_esc[f]() except KeyError: pass if self.vt100_parse_state == 'esc': self.vt100_parse_reset() else: # CSI mode f = self.vt100_parse_func p = self.vt100_parse_param try: self.vt100_csi[f](p) except KeyError: pass if self.vt100_parse_state == 'csi': self.vt100_parse_reset()
[docs] def vt100_write(self, char): if char < 32: if char == 27: self.vt100_parse_reset('esc') return True elif char == 14: self.ctrl_SO() elif char == 15: self.ctrl_SI() elif (char & 0xffe0) == 0x0080: self.vt100_parse_reset('esc') self.vt100_parse_func = chr(char - 0x40) self.vt100_parse_process() return True if self.vt100_parse_state: if self.vt100_parse_state == 'str': if char >= 32: return True self.vt100_parse_reset() else: if char < 32: if char == 24 or char == 26: self.vt100_parse_reset() return True else: self.vt100_parse_len += 1 if self.vt100_parse_len > 32: self.vt100_parse_reset() else: char_msb = char & 0xf0 if char_msb == 0x20: # Intermediate bytes (added to function) self.vt100_parse_func += chr(char) elif char_msb == 0x30 and self.vt100_parse_state == 'csi': # Parameter byte self.vt100_parse_param += chr(char) else: # Function byte self.vt100_parse_func += chr(char) self.vt100_parse_process() return True self.vt100_lastchar = char return False
# External interface
[docs] def set_size(self, w, h): if w < 2 or w > 256 or h < 2 or h > 256: return False self.w = w self.h = h self.reset_screen() return True
[docs] def read(self): d = self.vt100_out self.vt100_out = "" return d
[docs] def write(self, d): d = self.utf8_decode(d) for c in d: char = ord(c) if self.vt100_write(char): continue if self.dumb_write(char): continue if char <= 0xffff: self.dumb_echo(char) return True
[docs] def pipe(self, d): d = d.decode("utf8") # py3 patch o = '' for c in d: char = ord(c) if self.vt100_keyfilter_escape: self.vt100_keyfilter_escape = False try: if self.vt100_mode_cursorkey: o += self.vt100_keyfilter_appkeys[c] else: o += self.vt100_keyfilter_ansikeys[c] except KeyError: pass elif c == '~': self.vt100_keyfilter_escape = True elif char == 127: if self.vt100_mode_backspace: o += chr(8) else: o += chr(127) else: o += c if self.vt100_mode_lfnewline and char == 13: o += chr(10) return o
[docs] def dump(self): screen = [] attr_ = -1 cx, cy = min(self.cx, self.w - 1), self.cy for y in range(0, self.h): wx = 0 line = [""] for x in range(0, self.w): d = self.screen[y * self.w + x] char = d & 0xffff attr = d >> 16 # Cursor if cy == y and cx == x and self.vt100_mode_cursor: attr = attr & 0xfff0 | 0x000c # Attributes if attr != attr_: if attr_ != -1: line.append("") bg = attr & 0x000f fg = (attr & 0x00f0) >> 4 # Inverse inv = attr & 0x0200 inv2 = self.vt100_mode_inverse if (inv and not inv2) or (inv2 and not inv): fg, bg = bg, fg # Concealed if attr & 0x0400: fg = 0xc # Underline if attr & 0x0100: ul = True else: ul = False line.append((fg, bg, ul)) line.append("") attr_ = attr wx += self.utf8_charwidth(char) if wx <= self.w: line[-1] += chr(char) screen.append(line) return (cx, cy), screen
[docs]def synchronized(func): def wrapper(self, *args, **kwargs): try: self.lock.acquire() except AttributeError: self.lock = threading.RLock() self.lock.acquire() try: result = func(self, *args, **kwargs) finally: self.lock.release() return result return wrapper
[docs]class Multiplexer(object): def __init__(self, cmd="/bin/bash", env_term="xterm-color", timeout=60 * 60 * 24): # Set Linux signal handler if sys.platform in ("linux2", "linux3"): self.sigchldhandler = signal.signal(signal.SIGCHLD, signal.SIG_IGN) # Session self.session = {} self.cmd = cmd self.env_term = env_term self.timeout = timeout # Supervisor thread self.signal_stop = 0 self.thread = threading.Thread(target=self.proc_thread) self.thread.start()
[docs] def stop(self): # Stop supervisor thread self.signal_stop = 1 self.thread.join()
[docs] def proc_resize(self, sid, w, h): fd = self.session[sid]['fd'] # Set terminal size try: fcntl.ioctl(fd, struct.unpack('i', struct.pack('I', termios.TIOCSWINSZ) )[0], struct.pack("HHHH", h, w, 0, 0)) except (IOError, OSError): pass self.session[sid]['term'].set_size(w, h) self.session[sid]['w'] = w self.session[sid]['h'] = h
[docs] @synchronized def proc_keepalive(self, sid, w, h, cmd=None): if not sid in self.session: # Start a new session self.session[sid] = { 'state': 'unborn', 'term': Terminal(w, h), 'time': time.time(), 'w': w, 'h': h} return self.proc_spawn(sid, cmd) elif self.session[sid]['state'] == 'alive': self.session[sid]['time'] = time.time() # Update terminal size if self.session[sid]['w'] != w or self.session[sid]['h'] != h: self.proc_resize(sid, w, h) return True else: return False
[docs] def proc_spawn(self, sid, cmd=None): # Session self.session[sid]['state'] = 'alive' w, h = self.session[sid]['w'], self.session[sid]['h'] # Fork new process try: pid, fd = pty.fork() except (IOError, OSError): self.session[sid]['state'] = 'dead' return False if pid == 0: cmd = cmd or self.cmd # Safe way to make it work under BSD and Linux try: ls = os.environ['LANG'].split('.') except KeyError: ls = [] if len(ls) < 2: ls = ['en_US', 'UTF-8'] try: os.putenv('COLUMNS', str(w)) os.putenv('LINES', str(h)) os.putenv('TERM', self.env_term) os.putenv('PATH', os.environ['PATH']) os.putenv('LANG', ls[0] + '.UTF-8') # os.system(cmd) p = subprocess.Popen(cmd, shell=False) # print "called with subprocess", p.pid child_pid, sts = os.waitpid(p.pid, 0) # print "child_pid", child_pid, sts except (IOError, OSError): pass # self.proc_finish(sid) os._exit(0) else: # Store session vars self.session[sid]['pid'] = pid self.session[sid]['fd'] = fd # Set file control fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK) # Set terminal size self.proc_resize(sid, w, h) return True
[docs] def proc_waitfordeath(self, sid): try: os.close(self.session[sid]['fd']) except (KeyError, IOError, OSError): pass if sid in self.session: if 'fd' in self.session[sid]: del self.session[sid]['fd'] try: os.waitpid(self.session[sid]['pid'], 0) except (KeyError, IOError, OSError): pass if sid in self.session: if 'pid' in self.session[sid]: del self.session[sid]['pid'] self.session[sid]['state'] = 'dead' return True
[docs] def proc_bury(self, sid): if self.session[sid]['state'] == 'alive': try: os.kill(self.session[sid]['pid'], signal.SIGTERM) except (IOError, OSError): pass self.proc_waitfordeath(sid) if sid in self.session: del self.session[sid] return True
[docs] @synchronized def proc_buryall(self): for sid in list(self.session.keys()): self.proc_bury(sid)
[docs] @synchronized def proc_read(self, sid): """ Read from process """ if sid not in self.session: return False elif self.session[sid]['state'] != 'alive': return False try: fd = self.session[sid]['fd'] d = os.read(fd, 65536) if not d: # Process finished, BSD self.proc_waitfordeath(sid) return False except (IOError, OSError): # Process finished, Linux self.proc_waitfordeath(sid) return False term = self.session[sid]['term'] term.write(d) # Read terminal response d = term.read() if d: try: os.write(fd, d) except (IOError, OSError): return False return True
[docs] @synchronized def proc_write(self, sid, d): """ Write to process """ if sid not in self.session: return False elif self.session[sid]['state'] != 'alive': return False try: term = self.session[sid]['term'] d = term.pipe(d) fd = self.session[sid]['fd'] #os.write(fd, d) os.write(fd, d.encode("utf8")) # py3 patch except (IOError, OSError): return False return True
[docs] @synchronized def proc_dump(self, sid): """ Dump terminal output """ if sid not in self.session: return False return self.session[sid]['term'].dump()
[docs] @synchronized def proc_getalive(self): """ Get alive sessions, bury timed out ones """ fds = [] fd2sid = {} now = time.time() for sid in list(self.session.keys()): then = self.session[sid]['time'] if (now - then) > self.timeout: self.proc_bury(sid) else: if self.session[sid]['state'] == 'alive': fds.append(self.session[sid]['fd']) fd2sid[self.session[sid]['fd']] = sid return (fds, fd2sid)
[docs] def proc_thread(self): """ Supervisor thread """ while not self.signal_stop: # Read fds (fds, fd2sid) = self.proc_getalive() try: i, o, e = select.select(fds, [], [], 1.0) except (IOError, OSError): i = [] for fd in i: sid = fd2sid[fd] self.proc_read(sid) self.session[sid]["changed"] = time.time() if len(i): time.sleep(0.002) self.proc_buryall()
[docs]def ssh_command(login, executable="ssh"): cmd = executable cmd += ' -oPreferredAuthentications=keyboard-interactive,password' cmd += ' -oNoHostAuthenticationForLocalhost=yes' cmd += ' -oLogLevel=FATAL' cmd += ' -F/dev/null -l' + login + ' localhost' return cmd
[docs]class Session(object): _mux = None
[docs] @classmethod def close_all(cls): Session._mux.stop()
def __init__(self, cmd=None, width=80, height=24): if not Session._mux: Session._mux = Multiplexer() self._session_id = "%s-%s" % (time.time(), id(self)) self._width = width self._height = height self._started = False
[docs] def resize(self, width, height): self._width = width self._height = height if self._started: self.keepalive()
[docs] def start(self, cmd=None): self._started = Session._mux.proc_keepalive( self._session_id, self._width, self._height, cmd or self.cmd) return self._started
[docs] def close(self): return Session._mux.proc_bury(self._session_id)
stop = close
[docs] def is_alive(self): return Session._mux.session.get(self._session_id, {}).get('state') == 'alive'
[docs] def keepalive(self): return Session._mux.proc_keepalive(self._session_id, self._width, self._height)
[docs] def dump(self): if self.keepalive(): return Session._mux.proc_dump(self._session_id)
[docs] def write(self, data): if self.keepalive(): Session._mux.proc_write(self._session_id, data)
[docs] def last_change(self): return Session._mux.session.get(self._session_id, {}).get("changed", None)
[docs] def pid(self): return Session._mux.session.get(self._session_id, {}).get("pid", None)
if __name__ == "__main__": w, h = (80, 24) cmd = "/bin/ls --color=yes" multiplex = Multiplexer(cmd) sid = "session-id-%s" if multiplex.proc_keepalive(sid, w, h): #multiplex.proc_write(sid, k) time.sleep(1) # print multiplex.proc_dump(sid) print(("Output:", multiplex.proc_dump(sid))) multiplex.stop()