#!/usr/bin/env python3 # input-over-ssh: Mouse and keyboard forwarding over SSH # Copyright © 2019 Lee Yingtong Li (RunasSudo) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . MOUSEHOLD_X = 1366//2 MOUSEHOLD_Y = 728//2 MOUSEHOLD_W = 0 MOUSEHOLD_H = 0 import asyncio import re import subprocess import sys import time import tkinter if MOUSEHOLD_W > 0: root = tkinter.Tk() root.geometry('{}x{}+{}+{}'.format(MOUSEHOLD_W, MOUSEHOLD_H, MOUSEHOLD_X - MOUSEHOLD_W//2, MOUSEHOLD_Y - MOUSEHOLD_H//2)) root.update() async def mousemove(): last_mousemove = time.time() while True: p = await asyncio.create_subprocess_exec('xdotool', 'getmouselocation', stdout=subprocess.PIPE) stdout, _ = await p.communicate() result = {x[:x.index(':')]: int(x[x.index(':')+1:]) for x in stdout.decode('utf-8').strip().split(' ')} dx, dy = result['x'] - MOUSEHOLD_X, result['y'] - MOUSEHOLD_Y if dx != 0 and dy != 0: print(dx, dy, file=sys.stderr) print('DISPLAY=:0 xdotool mousemove_relative -- {} {}'.format(dx, dy)) p = await asyncio.create_subprocess_exec('xdotool', 'mousemove', str(MOUSEHOLD_X), str(MOUSEHOLD_Y)) await p.wait() wait_s = (last_mousemove + 1/30) - time.time() last_mousemove = time.time() if wait_s > 0: await asyncio.sleep(wait_s) async def mouseinp(): # Get mouse device p = await asyncio.create_subprocess_exec('xinput', 'list', stdout=subprocess.PIPE) stdout, _ = await p.communicate() device = re.search('Logitech USB Optical Mouse.*?id=([0-9]+)', stdout.decode('utf-8')).group(1) print('Mouse device {}'.format(device), file=sys.stderr) p_test = await asyncio.create_subprocess_exec('xinput', 'test', device, stdout=subprocess.PIPE) while True: line = (await p_test.stdout.readline()).decode('utf-8').strip() if line.startswith('button press'): print(line, file=sys.stderr) button = line.split()[-1] print('DISPLAY=:0 xdotool mousedown {}'.format(button)) elif line.startswith('button release'): print(line, file=sys.stderr) button = line.split()[-1] print('DISPLAY=:0 xdotool mouseup {}'.format(button)) async def keyboardinp(): is_ctrl = False # 37 # Get keyboard device p = await asyncio.create_subprocess_exec('xinput', 'list', stdout=subprocess.PIPE) stdout, _ = await p.communicate() device = re.search('AT Translated Set 2 keyboard.*?id=([0-9]+)', stdout.decode('utf-8')).group(1) print('Keyboard device {}'.format(device), file=sys.stderr) p_test = await asyncio.create_subprocess_exec('xinput', 'test', device, stdout=subprocess.PIPE) while True: line = (await p_test.stdout.readline()).decode('utf-8').strip() if line.startswith('key press'): print(line, file=sys.stderr) button = line.split()[-1] if button == '37': is_ctrl = True if button == '24' and is_ctrl: # Ctrl+Q sys.exit(0) return print('DISPLAY=:0 xdotool keydown {}'.format(button)) elif line.startswith('key release'): print(line, file=sys.stderr) button = line.split()[-1] if button == '37': is_ctrl = False print('DISPLAY=:0 xdotool keyup {}'.format(button)) async def main(): tasks = [] tasks.append(asyncio.create_task(mousemove())) tasks.append(asyncio.create_task(mouseinp())) tasks.append(asyncio.create_task(keyboardinp())) await asyncio.gather(*tasks) asyncio.run(main())