# -*- coding: utf-8 -*-
# This code is based on AjaxTerm/Web-Shell which included a fairly complete
# vt100 implementation as well as a stable process multiplexer.
# I made some small fixes, improved some small parts and added a Session class
# which can be used by the widget.
# License: GPL2
import sys
import os
import fcntl
import array
import threading
import time
import termios
import pty
import signal
import struct
import select
import subprocess
__version__ = "0.1"
[docs]class Terminal(object):
def __init__(self, w, h):
self.w = w
self.h = h
self.vt100_charset_graph = [
0x25ca, 0x2026, 0x2022, 0x3f,
0xb6, 0x3f, 0xb0, 0xb1,
0x3f, 0x3f, 0x2b, 0x2b,
0x2b, 0x2b, 0x2b, 0xaf,
0x2014, 0x2014, 0x2014, 0x5f,
0x2b, 0x2b, 0x2b, 0x2b,
0x7c, 0x2264, 0x2265, 0xb6,
0x2260, 0xa3, 0xb7, 0x7f
]
self.vt100_esc = {
'#8': self.esc_DECALN,
'(A': self.esc_G0_0,
'(B': self.esc_G0_1,
'(0': self.esc_G0_2,
'(1': self.esc_G0_3,
'(2': self.esc_G0_4,
')A': self.esc_G1_0,
')B': self.esc_G1_1,
')0': self.esc_G1_2,
')1': self.esc_G1_3,
')2': self.esc_G1_4,
'7': self.esc_DECSC,
'8': self.esc_DECRC,
'=': self.esc_DECKPAM,
'>': self.esc_DECKPNM,
'D': self.esc_IND,
'E': self.esc_NEL,
'H': self.esc_HTS,
'M': self.esc_RI,
'N': self.esc_SS2,
'O': self.esc_SS3,
'P': self.esc_DCS,
'X': self.esc_SOS,
'Z': self.esc_DECID,
'[': self.esc_CSI,
'\\': self.esc_ST,
']': self.esc_OSC,
'^': self.esc_PM,
'_': self.esc_APC,
'c': self.reset_hard,
}
self.vt100_csi = {
'@': self.csi_ICH,
'A': self.csi_CUU,
'B': self.csi_CUD,
'C': self.csi_CUF,
'D': self.csi_CUB,
'E': self.csi_CNL,
'F': self.csi_CPL,
'G': self.csi_CHA,
'H': self.csi_CUP,
'I': self.csi_CHT,
'J': self.csi_ED,
'K': self.csi_EL,
'L': self.csi_IL,
'M': self.csi_DL,
'P': self.csi_DCH,
'S': self.csi_SU,
'T': self.csi_SD,
'W': self.csi_CTC,
'X': self.csi_ECH,
'Z': self.csi_CBT,
'`': self.csi_HPA,
'a': self.csi_HPR,
'b': self.csi_REP,
'c': self.csi_DA,
'd': self.csi_VPA,
'e': self.csi_VPR,
'f': self.csi_HVP,
'g': self.csi_TBC,
'h': self.csi_SM,
'l': self.csi_RM,
'm': self.csi_SGR,
'n': self.csi_DSR,
'r': self.csi_DECSTBM,
's': self.csi_SCP,
'u': self.csi_RCP,
'x': self.csi_DECREQTPARM,
'!p': self.csi_DECSTR,
}
self.vt100_keyfilter_ansikeys = {
'~': '~',
'A': '\x1b[A',
'B': '\x1b[B',
'C': '\x1b[C',
'D': '\x1b[D',
'F': '\x1b[F',
'H': '\x1b[H',
'1': '\x1b[5~',
'2': '\x1b[6~',
'3': '\x1b[2~',
'4': '\x1b[3~',
'a': '\x1bOP',
'b': '\x1bOQ',
'c': '\x1bOR',
'd': '\x1bOS',
'e': '\x1b[15~',
'f': '\x1b[17~',
'g': '\x1b[18~',
'h': '\x1b[19~',
'i': '\x1b[20~',
'j': '\x1b[21~',
'k': '\x1b[23~',
'l': '\x1b[24~',
}
self.vt100_keyfilter_appkeys = {
'~': '~',
'A': '\x1bOA',
'B': '\x1bOB',
'C': '\x1bOC',
'D': '\x1bOD',
'F': '\x1bOF',
'H': '\x1bOH',
'1': '\x1b[5~',
'2': '\x1b[6~',
'3': '\x1b[2~',
'4': '\x1b[3~',
'a': '\x1bOP',
'b': '\x1bOQ',
'c': '\x1bOR',
'd': '\x1bOS',
'e': '\x1b[15~',
'f': '\x1b[17~',
'g': '\x1b[18~',
'h': '\x1b[19~',
'i': '\x1b[20~',
'j': '\x1b[21~',
'k': '\x1b[23~',
'l': '\x1b[24~',
}
self.reset_hard()
# Reset functions
[docs] def reset_hard(self):
# Attribute mask: 0x0XFB0000
# X: Bit 0 - Underlined
# Bit 1 - Negative
# Bit 2 - Concealed
# F: Foreground
# B: Background
self.attr = 0x00fe0000
# UTF-8 decoder
self.utf8_units_count = 0
self.utf8_units_received = 0
self.utf8_char = 0
# Key filter
self.vt100_keyfilter_escape = False
# Last char
self.vt100_lastchar = 0
# Control sequences
self.vt100_parse_len = 0
self.vt100_parse_state = ""
self.vt100_parse_func = ""
self.vt100_parse_param = ""
# Buffers
self.vt100_out = ""
# Invoke other resets
self.reset_screen()
self.reset_soft()
[docs] def reset_soft(self):
# Attribute mask: 0x0XFB0000
# X: Bit 0 - Underlined
# Bit 1 - Negative
# Bit 2 - Concealed
# F: Foreground
# B: Background
self.attr = 0x00fe0000
# Scroll parameters
self.scroll_area_y0 = 0
self.scroll_area_y1 = self.h
# Character sets
self.vt100_charset_is_single_shift = False
self.vt100_charset_is_graphical = False
self.vt100_charset_g_sel = 0
self.vt100_charset_g = [0, 0]
# Modes
self.vt100_mode_insert = False
self.vt100_mode_lfnewline = False
self.vt100_mode_cursorkey = False
self.vt100_mode_column_switch = False
self.vt100_mode_inverse = False
self.vt100_mode_origin = False
self.vt100_mode_autowrap = True
self.vt100_mode_cursor = True
self.vt100_mode_alt_screen = False
self.vt100_mode_backspace = False
# Init DECSC state
self.esc_DECSC()
self.vt100_saved2 = self.vt100_saved
self.esc_DECSC()
[docs] def reset_screen(self):
# Screen
self.screen = array.array('i', [self.attr | 0x20] * self.w * self.h)
self.screen2 = array.array('i', [self.attr | 0x20] * self.w * self.h)
# Scroll parameters
self.scroll_area_y0 = 0
self.scroll_area_y1 = self.h
# Cursor position
self.cx = 0
self.cy = 0
# Tab stops
self.tab_stops = list(range(0, self.w, 8))
# UTF-8 functions
[docs] def utf8_decode(self, d):
d = str(d, encoding="utf8") # py3 patch
o = ''
for c in d:
char = ord(c)
if self.utf8_units_count != self.utf8_units_received:
self.utf8_units_received += 1
if (char & 0xc0) == 0x80:
self.utf8_char = (self.utf8_char << 6) | (char & 0x3f)
if self.utf8_units_count == self.utf8_units_received:
if self.utf8_char < 0x10000:
o += chr(self.utf8_char)
self.utf8_units_count = self.utf8_units_received = 0
else:
o += '?'
while self.utf8_units_received:
o += '?'
self.utf8_units_received -= 1
self.utf8_units_count = 0
else:
if (char & 0x80) == 0x00:
o += c
elif (char & 0xe0) == 0xc0:
self.utf8_units_count = 1
self.utf8_char = char & 0x1f
elif (char & 0xf0) == 0xe0:
self.utf8_units_count = 2
self.utf8_char = char & 0x0f
elif (char & 0xf8) == 0xf0:
self.utf8_units_count = 3
self.utf8_char = char & 0x07
else:
o += '?'
return o
[docs] def utf8_charwidth(self, char):
if char >= 0x2e80:
return 2
else:
return 1
# Low-level terminal functions
[docs] def peek(self, y0, x0, y1, x1):
return self.screen[self.w * y0 + x0:self.w * (y1 - 1) + x1]
[docs] def poke(self, y, x, s):
pos = self.w * y + x
self.screen[pos:pos + len(s)] = s
[docs] def fill(self, y0, x0, y1, x1, char):
n = self.w * (y1 - y0 - 1) + (x1 - x0)
self.poke(y0, x0, array.array('i', [char] * n))
[docs] def clear(self, y0, x0, y1, x1):
self.fill(y0, x0, y1, x1, self.attr | 0x20)
# Scrolling functions
# Cursor functions
[docs] def cursor_line_width(self, next_char):
wx = self.utf8_charwidth(next_char)
lx = 0
for x in range(min(self.cx, self.w)):
char = self.peek(self.cy, x, self.cy + 1, x + 1)[0] & 0xffff
wx += self.utf8_charwidth(char)
lx += 1
return wx, lx
[docs] def cursor_up(self, n=1):
self.cy = max(self.scroll_area_y0, self.cy - n)
[docs] def cursor_down(self, n=1):
self.cy = min(self.scroll_area_y1 - 1, self.cy + n)
[docs] def cursor_left(self, n=1):
self.cx = max(0, self.cx - n)
[docs] def cursor_right(self, n=1):
self.cx = min(self.w - 1, self.cx + n)
[docs] def cursor_set_x(self, x):
self.cx = max(0, x)
[docs] def cursor_set_y(self, y):
self.cy = max(0, min(self.h - 1, y))
[docs] def cursor_set(self, y, x):
self.cursor_set_x(x)
self.cursor_set_y(y)
# Dumb terminal
[docs] def ctrl_BS(self):
delta_y, cx = divmod(self.cx - 1, self.w)
cy = max(self.scroll_area_y0, self.cy + delta_y)
self.cursor_set(cy, cx)
[docs] def ctrl_HT(self, n=1):
if n > 0 and self.cx >= self.w:
return
if n <= 0 and self.cx == 0:
return
ts = 0
for i in range(len(self.tab_stops)):
if self.cx >= self.tab_stops[i]:
ts = i
ts += n
if ts < len(self.tab_stops) and ts >= 0:
self.cursor_set_x(self.tab_stops[ts])
else:
self.cursor_set_x(self.w - 1)
[docs] def ctrl_LF(self):
if self.vt100_mode_lfnewline:
self.ctrl_CR()
if self.cy == self.scroll_area_y1 - 1:
self.scroll_area_up(self.scroll_area_y0, self.scroll_area_y1)
else:
self.cursor_down()
[docs] def ctrl_CR(self):
self.cursor_set_x(0)
[docs] def dumb_write(self, char):
if char < 32:
if char == 8:
self.ctrl_BS()
elif char == 9:
self.ctrl_HT()
elif char >= 10 and char <= 12:
self.ctrl_LF()
elif char == 13:
self.ctrl_CR()
return True
return False
[docs] def dumb_echo(self, char):
# Check right bound
wx, cx = self.cursor_line_width(char)
# Newline
if wx > self.w:
if self.vt100_mode_autowrap:
self.ctrl_CR()
self.ctrl_LF()
else:
self.cx = cx - 1
if self.vt100_mode_insert:
self.scroll_line_right(self.cy, self.cx)
if self.vt100_charset_is_single_shift:
self.vt100_charset_is_single_shift = False
elif self.vt100_charset_is_graphical and (char & 0xffe0) == 0x0060:
char = self.vt100_charset_graph[char - 0x60]
self.poke(self.cy, self.cx, array.array('i', [self.attr | char]))
self.cursor_set_x(self.cx + 1)
# VT100 CTRL, ESC, CSI handlers
[docs] def vt100_charset_update(self):
self.vt100_charset_is_graphical = (
self.vt100_charset_g[self.vt100_charset_g_sel] == 2)
[docs] def vt100_charset_set(self, g):
# Invoke active character set
self.vt100_charset_g_sel = g
self.vt100_charset_update()
[docs] def vt100_charset_select(self, g, charset):
# Select charset
self.vt100_charset_g[g] = charset
self.vt100_charset_update()
[docs] def vt100_setmode(self, p, state):
# Set VT100 mode
p = self.vt100_parse_params(p, [], False)
for m in p:
if m == '4':
# Insertion replacement mode
self.vt100_mode_insert = state
elif m == '20':
# Linefeed/new line mode
self.vt100_mode_lfnewline = state
elif m == '?1':
# Cursor key mode
self.vt100_mode_cursorkey = state
elif m == '?3':
# Column mode
if self.vt100_mode_column_switch:
if state:
self.w = 132
else:
self.w = 80
self.reset_screen()
elif m == '?5':
# Screen mode
self.vt100_mode_inverse = state
elif m == '?6':
# Region origin mode
self.vt100_mode_origin = state
if state:
self.cursor_set(self.scroll_area_y0, 0)
else:
self.cursor_set(0, 0)
elif m == '?7':
# Autowrap mode
self.vt100_mode_autowrap = state
elif m == '?25':
# Text cursor enable mode
self.vt100_mode_cursor = state
elif m == '?40':
# Column switch control
self.vt100_mode_column_switch = state
elif m == '?47':
# Alternate screen mode
if ((state and not self.vt100_mode_alt_screen) or
(not state and self.vt100_mode_alt_screen)):
self.screen, self.screen2 = self.screen2, self.screen
self.vt100_saved, self.vt100_saved2 = self.vt100_saved2, self.vt100_saved
self.vt100_mode_alt_screen = state
elif m == '?67':
# Backspace/delete
self.vt100_mode_backspace = state
[docs] def ctrl_SO(self):
# Shift out
self.vt100_charset_set(1)
[docs] def ctrl_SI(self):
# Shift in
self.vt100_charset_set(0)
[docs] def esc_CSI(self):
# CSI start sequence
self.vt100_parse_reset('csi')
[docs] def esc_DECALN(self):
# Screen alignment display
self.fill(0, 0, self.h, self.w, 0x00fe0045)
[docs] def esc_G0_0(self):
self.vt100_charset_select(0, 0)
[docs] def esc_G0_1(self):
self.vt100_charset_select(0, 1)
[docs] def esc_G0_2(self):
self.vt100_charset_select(0, 2)
[docs] def esc_G0_3(self):
self.vt100_charset_select(0, 3)
[docs] def esc_G0_4(self):
self.vt100_charset_select(0, 4)
[docs] def esc_G1_0(self):
self.vt100_charset_select(1, 0)
[docs] def esc_G1_1(self):
self.vt100_charset_select(1, 1)
[docs] def esc_G1_2(self):
self.vt100_charset_select(1, 2)
[docs] def esc_G1_3(self):
self.vt100_charset_select(1, 3)
[docs] def esc_G1_4(self):
self.vt100_charset_select(1, 4)
[docs] def esc_DECSC(self):
# Store cursor
self.vt100_saved = {}
self.vt100_saved['cx'] = self.cx
self.vt100_saved['cy'] = self.cy
self.vt100_saved['attr'] = self.attr
self.vt100_saved['charset_g_sel'] = self.vt100_charset_g_sel
self.vt100_saved['charset_g'] = self.vt100_charset_g[:]
self.vt100_saved['mode_autowrap'] = self.vt100_mode_autowrap
self.vt100_saved['mode_origin'] = self.vt100_mode_origin
[docs] def esc_DECRC(self):
# Retore cursor
self.cx = self.vt100_saved['cx']
self.cy = self.vt100_saved['cy']
self.attr = self.vt100_saved['attr']
self.vt100_charset_g_sel = self.vt100_saved['charset_g_sel']
self.vt100_charset_g = self.vt100_saved['charset_g'][:]
self.vt100_charset_update()
self.vt100_mode_autowrap = self.vt100_saved['mode_autowrap']
self.vt100_mode_origin = self.vt100_saved['mode_origin']
[docs] def esc_DECKPAM(self):
# Application keypad mode
pass
[docs] def esc_DECKPNM(self):
# Numeric keypad mode
pass
[docs] def esc_IND(self):
# Index
self.ctrl_LF()
[docs] def esc_NEL(self):
# Next line
self.ctrl_CR()
self.ctrl_LF()
[docs] def esc_HTS(self):
# Character tabulation set
self.csi_CTC('0')
[docs] def esc_RI(self):
# Reverse line feed
if self.cy == self.scroll_area_y0:
self.scroll_area_down(self.scroll_area_y0, self.scroll_area_y1)
else:
self.cursor_up()
[docs] def esc_SS2(self):
# Single-shift two
self.vt100_charset_is_single_shift = True
[docs] def esc_SS3(self):
# Single-shift three
self.vt100_charset_is_single_shift = True
[docs] def esc_DCS(self):
# Device control string
self.vt100_parse_reset('str')
[docs] def esc_SOS(self):
# Start of string
self.vt100_parse_reset('str')
[docs] def esc_DECID(self):
# Identify terminal
self.csi_DA('0')
[docs] def esc_ST(self):
# String terminator
pass
[docs] def esc_OSC(self):
# Operating system command
self.vt100_parse_reset('str')
[docs] def esc_PM(self):
# Privacy message
self.vt100_parse_reset('str')
[docs] def esc_APC(self):
# Application program command
self.vt100_parse_reset('str')
[docs] def csi_ICH(self, p):
# Insert character
p = self.vt100_parse_params(p, [1])
self.scroll_line_right(self.cy, self.cx, p[0])
[docs] def csi_CUU(self, p):
# Cursor up
p = self.vt100_parse_params(p, [1])
self.cursor_up(max(1, p[0]))
[docs] def csi_CUD(self, p):
# Cursor down
p = self.vt100_parse_params(p, [1])
self.cursor_down(max(1, p[0]))
[docs] def csi_CUF(self, p):
# Cursor right
p = self.vt100_parse_params(p, [1])
self.cursor_right(max(1, p[0]))
[docs] def csi_CUB(self, p):
# Cursor left
p = self.vt100_parse_params(p, [1])
self.cursor_left(max(1, p[0]))
[docs] def csi_CNL(self, p):
# Cursor next line
self.csi_CUD(p)
self.ctrl_CR()
[docs] def csi_CPL(self, p):
# Cursor preceding line
self.csi_CUU(p)
self.ctrl_CR()
[docs] def csi_CHA(self, p):
# Cursor character absolute
p = self.vt100_parse_params(p, [1])
self.cursor_set_x(p[0] - 1)
[docs] def csi_CUP(self, p):
# Set cursor position
p = self.vt100_parse_params(p, [1, 1])
if self.vt100_mode_origin:
self.cursor_set(self.scroll_area_y0 + p[0] - 1, p[1] - 1)
else:
self.cursor_set(p[0] - 1, p[1] - 1)
[docs] def csi_CHT(self, p):
# Cursor forward tabulation
p = self.vt100_parse_params(p, [1])
self.ctrl_HT(max(1, p[0]))
[docs] def csi_ED(self, p):
# Erase in display
p = self.vt100_parse_params(p, ['0'], False)
if p[0] == '0':
self.clear(self.cy, self.cx, self.h, self.w)
elif p[0] == '1':
self.clear(0, 0, self.cy + 1, self.cx + 1)
elif p[0] == '2':
self.clear(0, 0, self.h, self.w)
[docs] def csi_EL(self, p):
# Erase in line
p = self.vt100_parse_params(p, ['0'], False)
if p[0] == '0':
self.clear(self.cy, self.cx, self.cy + 1, self.w)
elif p[0] == '1':
self.clear(self.cy, 0, self.cy + 1, self.cx + 1)
elif p[0] == '2':
self.clear(self.cy, 0, self.cy + 1, self.w)
[docs] def csi_IL(self, p):
# Insert line
p = self.vt100_parse_params(p, [1])
if (self.cy >= self.scroll_area_y0 and self.cy < self.scroll_area_y1):
self.scroll_area_down(self.cy, self.scroll_area_y1, max(1, p[0]))
[docs] def csi_DL(self, p):
# Delete line
p = self.vt100_parse_params(p, [1])
if (self.cy >= self.scroll_area_y0 and self.cy < self.scroll_area_y1):
self.scroll_area_up(self.cy, self.scroll_area_y1, max(1, p[0]))
[docs] def csi_DCH(self, p):
# Delete characters
p = self.vt100_parse_params(p, [1])
self.scroll_line_left(self.cy, self.cx, max(1, p[0]))
[docs] def csi_SU(self, p):
# Scroll up
p = self.vt100_parse_params(p, [1])
self.scroll_area_up(
self.scroll_area_y0, self.scroll_area_y1, max(1, p[0]))
[docs] def csi_SD(self, p):
# Scroll down
p = self.vt100_parse_params(p, [1])
self.scroll_area_down(
self.scroll_area_y0, self.scroll_area_y1, max(1, p[0]))
[docs] def csi_CTC(self, p):
# Cursor tabulation control
p = self.vt100_parse_params(p, ['0'], False)
for m in p:
if m == '0':
try:
ts = self.tab_stops.index(self.cx)
except ValueError:
tab_stops = self.tab_stops
tab_stops.append(self.cx)
tab_stops.sort()
self.tab_stops = tab_stops
elif m == '2':
try:
self.tab_stops.remove(self.cx)
except ValueError:
pass
elif m == '5':
self.tab_stops = [0]
[docs] def csi_ECH(self, p):
# Erase character
p = self.vt100_parse_params(p, [1])
n = min(self.w - self.cx, max(1, p[0]))
self.clear(self.cy, self.cx, self.cy + 1, self.cx + n)
[docs] def csi_CBT(self, p):
# Cursor backward tabulation
p = self.vt100_parse_params(p, [1])
self.ctrl_HT(1 - max(1, p[0]))
[docs] def csi_HPA(self, p):
# Character position absolute
p = self.vt100_parse_params(p, [1])
self.cursor_set_x(p[0] - 1)
[docs] def csi_HPR(self, p):
# Character position forward
self.csi_CUF(p)
[docs] def csi_REP(self, p):
# Repeat
p = self.vt100_parse_params(p, [1])
if self.vt100_lastchar < 32:
return
n = min(2000, max(1, p[0]))
while n:
self.dumb_echo(self.vt100_lastchar)
n -= 1
self.vt100_lastchar = 0
[docs] def csi_DA(self, p):
# Device attributes
p = self.vt100_parse_params(p, ['0'], False)
if p[0] == '0':
self.vt100_out = "\x1b[?1;2c"
elif p[0] == '>0' or p[0] == '>':
self.vt100_out = "\x1b[>0;184;0c"
[docs] def csi_VPA(self, p):
# Line position absolute
p = self.vt100_parse_params(p, [1])
self.cursor_set_y(p[0] - 1)
[docs] def csi_VPR(self, p):
# Line position forward
self.csi_CUD(p)
[docs] def csi_HVP(self, p):
# Character and line position
self.csi_CUP(p)
[docs] def csi_TBC(self, p):
# Tabulation clear
p = self.vt100_parse_params(p, ['0'], False)
if p[0] == '0':
self.csi_CTC('2')
elif p[0] == '3':
self.csi_CTC('5')
[docs] def csi_SM(self, p):
# Set mode
self.vt100_setmode(p, True)
[docs] def csi_RM(self, p):
# Reset mode
self.vt100_setmode(p, False)
[docs] def csi_SGR(self, p):
# Select graphic rendition
p = self.vt100_parse_params(p, [0])
for m in p:
if m == 0:
# Reset
self.attr = 0x00fe0000
elif m == 4:
# Underlined
self.attr |= 0x01000000
elif m == 7:
# Negative
self.attr |= 0x02000000
elif m == 8:
# Concealed
self.attr |= 0x04000000
elif m == 24:
# Not underlined
self.attr &= 0x7eff0000
elif m == 27:
# Positive
self.attr &= 0x7dff0000
elif m == 28:
# Revealed
self.attr &= 0x7bff0000
elif m >= 30 and m <= 37:
# Foreground
self.attr = (self.attr & 0x7f0f0000) | ((m - 30) << 20)
elif m == 39:
# Default fg color
self.attr = (self.attr & 0x7f0f0000) | 0x00f00000
elif m >= 40 and m <= 47:
# Background
self.attr = (self.attr & 0x7ff00000) | ((m - 40) << 16)
elif m == 49:
# Default bg color
self.attr = (self.attr & 0x7ff00000) | 0x000e0000
[docs] def csi_DSR(self, p):
# Device status report
p = self.vt100_parse_params(p, ['0'], False)
if p[0] == '5':
self.vt100_out = "\x1b[0n"
elif p[0] == '6':
x = self.cx + 1
y = self.cy + 1
self.vt100_out = '\x1b[%d;%dR' % (y, x)
elif p[0] == '7':
self.vt100_out = 'WebShell'
elif p[0] == '8':
self.vt100_out = __version__
elif p[0] == '?6':
x = self.cx + 1
y = self.cy + 1
self.vt100_out = '\x1b[?%d;%dR' % (y, x)
elif p[0] == '?15':
self.vt100_out = '\x1b[?13n'
elif p[0] == '?25':
self.vt100_out = '\x1b[?20n'
elif p[0] == '?26':
self.vt100_out = '\x1b[?27;1n'
elif p[0] == '?53':
self.vt100_out = '\x1b[?53n'
[docs] def csi_DECSTBM(self, p):
# Set top and bottom margins
p = self.vt100_parse_params(p, [1, self.h])
self.scroll_area_set(p[0] - 1, p[1])
if self.vt100_mode_origin:
self.cursor_set(self.scroll_area_y0, 0)
else:
self.cursor_set(0, 0)
[docs] def csi_SCP(self, p):
# Save cursor position
self.vt100_saved_cx = self.cx
self.vt100_saved_cy = self.cy
[docs] def csi_RCP(self, p):
# Restore cursor position
self.cx = self.vt100_saved_cx
self.cy = self.vt100_saved_cy
[docs] def csi_DECREQTPARM(self, p):
# Request terminal parameters
p = self.vt100_parse_params(p, [], False)
if p[0] == '0':
self.vt100_out = "\x1b[2;1;1;112;112;1;0x"
elif p[0] == '1':
self.vt100_out = "\x1b[3;1;1;112;112;1;0x"
[docs] def csi_DECSTR(self, p):
# Soft terminal reset
self.reset_soft()
# VT100 Parser
[docs] def vt100_parse_params(self, p, d, to_int=True):
# Process parameters (params p with defaults d)
# Add prefix to all parameters
prefix = ''
if len(p) > 0:
if p[0] >= '<' and p[0] <= '?':
prefix = p[0]
p = p[1:]
p = p.split(';')
else:
p = ''
# Process parameters
n = max(len(p), len(d))
o = []
for i in range(n):
value_def = False
if i < len(p):
value = prefix + p[i]
value_def = True
if to_int:
try:
value = int(value)
except ValueError:
value_def = False
if (not value_def) and i < len(d):
value = d[i]
o.append(value)
return o
[docs] def vt100_parse_reset(self, vt100_parse_state=""):
self.vt100_parse_state = vt100_parse_state
self.vt100_parse_len = 0
self.vt100_parse_func = ""
self.vt100_parse_param = ""
[docs] def vt100_parse_process(self):
if self.vt100_parse_state == 'esc':
# ESC mode
f = self.vt100_parse_func
try:
self.vt100_esc[f]()
except KeyError:
pass
if self.vt100_parse_state == 'esc':
self.vt100_parse_reset()
else:
# CSI mode
f = self.vt100_parse_func
p = self.vt100_parse_param
try:
self.vt100_csi[f](p)
except KeyError:
pass
if self.vt100_parse_state == 'csi':
self.vt100_parse_reset()
[docs] def vt100_write(self, char):
if char < 32:
if char == 27:
self.vt100_parse_reset('esc')
return True
elif char == 14:
self.ctrl_SO()
elif char == 15:
self.ctrl_SI()
elif (char & 0xffe0) == 0x0080:
self.vt100_parse_reset('esc')
self.vt100_parse_func = chr(char - 0x40)
self.vt100_parse_process()
return True
if self.vt100_parse_state:
if self.vt100_parse_state == 'str':
if char >= 32:
return True
self.vt100_parse_reset()
else:
if char < 32:
if char == 24 or char == 26:
self.vt100_parse_reset()
return True
else:
self.vt100_parse_len += 1
if self.vt100_parse_len > 32:
self.vt100_parse_reset()
else:
char_msb = char & 0xf0
if char_msb == 0x20:
# Intermediate bytes (added to function)
self.vt100_parse_func += chr(char)
elif char_msb == 0x30 and self.vt100_parse_state == 'csi':
# Parameter byte
self.vt100_parse_param += chr(char)
else:
# Function byte
self.vt100_parse_func += chr(char)
self.vt100_parse_process()
return True
self.vt100_lastchar = char
return False
# External interface
[docs] def set_size(self, w, h):
if w < 2 or w > 256 or h < 2 or h > 256:
return False
self.w = w
self.h = h
self.reset_screen()
return True
[docs] def read(self):
d = self.vt100_out
self.vt100_out = ""
return d
[docs] def write(self, d):
d = self.utf8_decode(d)
for c in d:
char = ord(c)
if self.vt100_write(char):
continue
if self.dumb_write(char):
continue
if char <= 0xffff:
self.dumb_echo(char)
return True
[docs] def pipe(self, d):
d = d.decode("utf8") # py3 patch
o = ''
for c in d:
char = ord(c)
if self.vt100_keyfilter_escape:
self.vt100_keyfilter_escape = False
try:
if self.vt100_mode_cursorkey:
o += self.vt100_keyfilter_appkeys[c]
else:
o += self.vt100_keyfilter_ansikeys[c]
except KeyError:
pass
elif c == '~':
self.vt100_keyfilter_escape = True
elif char == 127:
if self.vt100_mode_backspace:
o += chr(8)
else:
o += chr(127)
else:
o += c
if self.vt100_mode_lfnewline and char == 13:
o += chr(10)
return o
[docs] def dump(self):
screen = []
attr_ = -1
cx, cy = min(self.cx, self.w - 1), self.cy
for y in range(0, self.h):
wx = 0
line = [""]
for x in range(0, self.w):
d = self.screen[y * self.w + x]
char = d & 0xffff
attr = d >> 16
# Cursor
if cy == y and cx == x and self.vt100_mode_cursor:
attr = attr & 0xfff0 | 0x000c
# Attributes
if attr != attr_:
if attr_ != -1:
line.append("")
bg = attr & 0x000f
fg = (attr & 0x00f0) >> 4
# Inverse
inv = attr & 0x0200
inv2 = self.vt100_mode_inverse
if (inv and not inv2) or (inv2 and not inv):
fg, bg = bg, fg
# Concealed
if attr & 0x0400:
fg = 0xc
# Underline
if attr & 0x0100:
ul = True
else:
ul = False
line.append((fg, bg, ul))
line.append("")
attr_ = attr
wx += self.utf8_charwidth(char)
if wx <= self.w:
line[-1] += chr(char)
screen.append(line)
return (cx, cy), screen
[docs]def synchronized(func):
def wrapper(self, *args, **kwargs):
try:
self.lock.acquire()
except AttributeError:
self.lock = threading.RLock()
self.lock.acquire()
try:
result = func(self, *args, **kwargs)
finally:
self.lock.release()
return result
return wrapper
[docs]class Multiplexer(object):
def __init__(self, cmd="/bin/bash", env_term="xterm-color", timeout=60 * 60 * 24):
# Set Linux signal handler
if sys.platform in ("linux2", "linux3"):
self.sigchldhandler = signal.signal(signal.SIGCHLD, signal.SIG_IGN)
# Session
self.session = {}
self.cmd = cmd
self.env_term = env_term
self.timeout = timeout
# Supervisor thread
self.signal_stop = 0
self.thread = threading.Thread(target=self.proc_thread)
self.thread.start()
[docs] def stop(self):
# Stop supervisor thread
self.signal_stop = 1
self.thread.join()
[docs] def proc_resize(self, sid, w, h):
fd = self.session[sid]['fd']
# Set terminal size
try:
fcntl.ioctl(fd,
struct.unpack('i',
struct.pack('I', termios.TIOCSWINSZ)
)[0],
struct.pack("HHHH", h, w, 0, 0))
except (IOError, OSError):
pass
self.session[sid]['term'].set_size(w, h)
self.session[sid]['w'] = w
self.session[sid]['h'] = h
[docs] @synchronized
def proc_keepalive(self, sid, w, h, cmd=None):
if not sid in self.session:
# Start a new session
self.session[sid] = {
'state': 'unborn',
'term': Terminal(w, h),
'time': time.time(),
'w': w,
'h': h}
return self.proc_spawn(sid, cmd)
elif self.session[sid]['state'] == 'alive':
self.session[sid]['time'] = time.time()
# Update terminal size
if self.session[sid]['w'] != w or self.session[sid]['h'] != h:
self.proc_resize(sid, w, h)
return True
else:
return False
[docs] def proc_spawn(self, sid, cmd=None):
# Session
self.session[sid]['state'] = 'alive'
w, h = self.session[sid]['w'], self.session[sid]['h']
# Fork new process
try:
pid, fd = pty.fork()
except (IOError, OSError):
self.session[sid]['state'] = 'dead'
return False
if pid == 0:
cmd = cmd or self.cmd
# Safe way to make it work under BSD and Linux
try:
ls = os.environ['LANG'].split('.')
except KeyError:
ls = []
if len(ls) < 2:
ls = ['en_US', 'UTF-8']
try:
os.putenv('COLUMNS', str(w))
os.putenv('LINES', str(h))
os.putenv('TERM', self.env_term)
os.putenv('PATH', os.environ['PATH'])
os.putenv('LANG', ls[0] + '.UTF-8')
# os.system(cmd)
p = subprocess.Popen(cmd, shell=False)
# print "called with subprocess", p.pid
child_pid, sts = os.waitpid(p.pid, 0)
# print "child_pid", child_pid, sts
except (IOError, OSError):
pass
# self.proc_finish(sid)
os._exit(0)
else:
# Store session vars
self.session[sid]['pid'] = pid
self.session[sid]['fd'] = fd
# Set file control
fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK)
# Set terminal size
self.proc_resize(sid, w, h)
return True
[docs] def proc_waitfordeath(self, sid):
try:
os.close(self.session[sid]['fd'])
except (KeyError, IOError, OSError):
pass
if sid in self.session:
if 'fd' in self.session[sid]:
del self.session[sid]['fd']
try:
os.waitpid(self.session[sid]['pid'], 0)
except (KeyError, IOError, OSError):
pass
if sid in self.session:
if 'pid' in self.session[sid]:
del self.session[sid]['pid']
self.session[sid]['state'] = 'dead'
return True
[docs] def proc_bury(self, sid):
if self.session[sid]['state'] == 'alive':
try:
os.kill(self.session[sid]['pid'], signal.SIGTERM)
except (IOError, OSError):
pass
self.proc_waitfordeath(sid)
if sid in self.session:
del self.session[sid]
return True
[docs] @synchronized
def proc_buryall(self):
for sid in list(self.session.keys()):
self.proc_bury(sid)
[docs] @synchronized
def proc_read(self, sid):
"""
Read from process
"""
if sid not in self.session:
return False
elif self.session[sid]['state'] != 'alive':
return False
try:
fd = self.session[sid]['fd']
d = os.read(fd, 65536)
if not d:
# Process finished, BSD
self.proc_waitfordeath(sid)
return False
except (IOError, OSError):
# Process finished, Linux
self.proc_waitfordeath(sid)
return False
term = self.session[sid]['term']
term.write(d)
# Read terminal response
d = term.read()
if d:
try:
os.write(fd, d)
except (IOError, OSError):
return False
return True
[docs] @synchronized
def proc_write(self, sid, d):
"""
Write to process
"""
if sid not in self.session:
return False
elif self.session[sid]['state'] != 'alive':
return False
try:
term = self.session[sid]['term']
d = term.pipe(d)
fd = self.session[sid]['fd']
#os.write(fd, d)
os.write(fd, d.encode("utf8")) # py3 patch
except (IOError, OSError):
return False
return True
[docs] @synchronized
def proc_dump(self, sid):
"""
Dump terminal output
"""
if sid not in self.session:
return False
return self.session[sid]['term'].dump()
[docs] @synchronized
def proc_getalive(self):
"""
Get alive sessions, bury timed out ones
"""
fds = []
fd2sid = {}
now = time.time()
for sid in list(self.session.keys()):
then = self.session[sid]['time']
if (now - then) > self.timeout:
self.proc_bury(sid)
else:
if self.session[sid]['state'] == 'alive':
fds.append(self.session[sid]['fd'])
fd2sid[self.session[sid]['fd']] = sid
return (fds, fd2sid)
[docs] def proc_thread(self):
"""
Supervisor thread
"""
while not self.signal_stop:
# Read fds
(fds, fd2sid) = self.proc_getalive()
try:
i, o, e = select.select(fds, [], [], 1.0)
except (IOError, OSError):
i = []
for fd in i:
sid = fd2sid[fd]
self.proc_read(sid)
self.session[sid]["changed"] = time.time()
if len(i):
time.sleep(0.002)
self.proc_buryall()
[docs]def ssh_command(login, executable="ssh"):
cmd = executable
cmd += ' -oPreferredAuthentications=keyboard-interactive,password'
cmd += ' -oNoHostAuthenticationForLocalhost=yes'
cmd += ' -oLogLevel=FATAL'
cmd += ' -F/dev/null -l' + login + ' localhost'
return cmd
[docs]class Session(object):
_mux = None
[docs] @classmethod
def close_all(cls):
Session._mux.stop()
def __init__(self, cmd=None, width=80, height=24):
if not Session._mux:
Session._mux = Multiplexer()
self._session_id = "%s-%s" % (time.time(), id(self))
self._width = width
self._height = height
self._started = False
[docs] def resize(self, width, height):
self._width = width
self._height = height
if self._started:
self.keepalive()
[docs] def start(self, cmd=None):
self._started = Session._mux.proc_keepalive(
self._session_id, self._width, self._height, cmd or self.cmd)
return self._started
[docs] def close(self):
return Session._mux.proc_bury(self._session_id)
stop = close
[docs] def is_alive(self):
return Session._mux.session.get(self._session_id, {}).get('state') == 'alive'
[docs] def keepalive(self):
return Session._mux.proc_keepalive(self._session_id, self._width, self._height)
[docs] def dump(self):
if self.keepalive():
return Session._mux.proc_dump(self._session_id)
[docs] def write(self, data):
if self.keepalive():
Session._mux.proc_write(self._session_id, data)
[docs] def last_change(self):
return Session._mux.session.get(self._session_id, {}).get("changed", None)
[docs] def pid(self):
return Session._mux.session.get(self._session_id, {}).get("pid", None)
if __name__ == "__main__":
w, h = (80, 24)
cmd = "/bin/ls --color=yes"
multiplex = Multiplexer(cmd)
sid = "session-id-%s"
if multiplex.proc_keepalive(sid, w, h):
#multiplex.proc_write(sid, k)
time.sleep(1)
# print multiplex.proc_dump(sid)
print(("Output:", multiplex.proc_dump(sid)))
multiplex.stop()