I am currently using 5.2.1 version of xterm.js and django in backend. I want to detect when a text editor is run in terminal and log it. My current approach is to filter it using the keywords such as nano,vim etc. The only problem is that the user may use alias so i want to run "alias" command in the background constantly and check if there is any alias of nano or vim and add them to blacklist too. The "alias" command must be run in the bash process that is run from connect function otherwise it would be another terminal instance. Also the user must not see the command in browser so using os.write() is not an option. How can i achieve this in the most foolproof way?
EDIT:
I made some researches and it is very hard to implement current approach. Is there a way to block user from using alias entirely?
index.html:
<script type="module">
var socket = io.connect({ transports: ["websocket", "polling"] });
const status = document.getElementById("status")
const button = document.getElementById("button")
const fit = new FitAddon.FitAddon();
var term = new Terminal({
cursorBlink: true,
});
term.loadAddon(fit);
term.open(document.getElementById('terminal'));
fit.fit();
var terminal_line = '';
term.onKey(e => {
if (e.key == "\r") {
terminal_line = term.buffer.active.getLine(term._core.buffer.y)?.translateToString();
console.log("terminal line: ", terminal_line);
socket.emit("log_input", { "user_input": terminal_line });
}
socket.emit("pty_input", { "input": e.key });
})
socket.on("pty_output", function (output) {
console.log("output: ", output["output"]);
term.write(output["output"]);
})
socket.on("connect", () => {
status.innerHTML = '<span style="background-color: lightgreen;">connected</span>'
button.innerHTML = 'Disconnect'
})
socket.on("disconnect", () => {
status.innerHTML = '<span style="background-color: #ff8383;">disconnected</span>'
button.innerHTML = 'Connect'
})
function myFunction() {
if (button.innerHTML == 'Connect') {
location.reload();
}
else if (button.innerHTML == "Disconnect") {
socket.emit("disconnect_request")
}
}
function resize() {
console.log("resized")
fit.fit()
socket.emit("resize", { "cols": term.cols, "rows": term.rows })
}
window.onresize = resize
window.onload = resize
</script>
views.py:
import os
from django.shortcuts import render
import socketio
import pty
import select
import subprocess
import struct
import fcntl
import termios
import signal
import time
async_mode = "eventlet"
sio = socketio.Server(async_mode=async_mode)
fd = None
child_pid = None
def index(request):
return render(request, "index.html")
def set_winsize(fd, row, col, xpix=0, ypix=0):
winsize = struct.pack("HHHH", row, col, xpix, ypix)
fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize)
def read_and_forward_pty_output():
global fd
max_read_bytes = 1024 * 20
while True:
sio.sleep(0.01)
if fd:
timeout_sec = 0
(data_ready, _, _) = select.select([fd], [], [], timeout_sec)
if data_ready:
output = os.read(fd, max_read_bytes).decode()
sio.emit("pty_output", {"output": output})
else:
print("process killed")
return
@sio.event
def resize(sid, message):
print("entered resize")
if fd:
set_winsize(fd, message["rows"], message["cols"])
@sio.event
def pty_input(sid, message):
if fd:
os.write(fd, message["input"].encode())
@sio.event
def log_input(sid, user_input):
with open("userinput.log", "a") as f:
f.write(user_input["user_input"]+"\n")
def extract_path(raw_input):
if "/" not in raw_input:
filename = raw_input.split()[-1]
else:
filename = raw_input.split()[-1][raw_input.split()[-1].rfind("/")+1:]
path = os.getcwd()+"/"+filename
print("Filename: ",filename)
print(os.path.abspath(path))
@sio.event
def disconnect_request(sid):
sio.disconnect(sid)
@sio.event
def connect(sid, environ):
global fd
global child_pid
if child_pid:
os.write(fd, "\n".encode())
return
(child_pid, fd) = pty.fork()
if child_pid == 0:
subprocess.run("bash")
subprocess.run("clear")
else:
sio.start_background_task(target=read_and_forward_pty_output)
@sio.event
def disconnect(sid):
global fd
global child_pid
os.kill(child_pid, signal.SIGKILL)
os.wait()
fd = None
child_pid = None
print("Client disconnected")
At first I was trying to get and save the aliases in the terminal but it is a specific bash process that is run from subprocess module. It means using subprocess.run("alias") is not an option because it is run in a separate process. Also copying the environment variables and starting another bash process is not viable because this is an interactive terminal that can constantly change. After a lot of researches and trials i decided to block the user from using alias entirely. This approach basicly detects "alias" prompt in the frontend and sends kill(ctrl+c) signal to django. It is possible by sending the special character that is equivalent of ctrl+c in the pty_input method. I used that instead of just sending enter or new line because this program mimics the terminal so after typing alias sending those characters is no different than running it.
TLDR:
These changes in the views.py and index.html blocks the usage of "alias" and prints warning logs when user attempts to use text editors.
views.py:
...
@sio.event
def log_input(sid, user_input):
if "root" in user_input["user_input"][:4]:
prompt = user_input["user_input"][user_input["user_input"].find('#'):].rstrip()
logger.info(f"User run '{prompt[2:]}' command")
else:
prompt = user_input["user_input"][user_input["user_input"].find('$'):].rstrip()
logger.info(f"User run '{prompt[2:]}' command")
if any(keyword in prompt for keyword in command_filter):
extract_path(prompt[2:])
def extract_path(raw_input):
if "/" not in raw_input:
filename = raw_input.split()[-1]
else:
filename = raw_input.split()[-1][raw_input.split()[-1].rfind("/")+1:]
path = os.getcwd()+"/"+filename
path = os.path.abspath(path)
if os.path.exists(path):
logger.warning(f"User edited file named '{filename}' in {path}")
else:
logger.warning(f"User trying to create file named '{filename}' in {path}")
return path
...
index.html:
...
term.onKey(e => {
if (e.key == "\r") {
terminal_line = term.buffer.active.getLine(term._core.buffer.y)?.translateToString();
console.log("terminal line: ", terminal_line);
socket.emit("log_input", { "user_input": terminal_line });
}
else if (e.key == "\x04"){
console.log("Exited gracefully")
}
if(terminal_line.includes("alias")){
term.write("\r\nalias not allowed!\r\n");
socket.emit("pty_input", { "input": ""});
term.write("\r");
terminal_line = "";
}
else{
socket.emit("pty_input", { "input": e.key });
}
})
...