From f37a5d3e4d88400affd0ca0b66b280d54cd6226c Mon Sep 17 00:00:00 2001 From: mdnghtman Date: Fri, 8 Apr 2022 17:20:44 +0200 Subject: [PATCH] Add files via upload --- sync_mpv_client.py | 422 +++++++++++++++++++++++++++++++++++++++++++++ sync_mpv_server.py | 235 +++++++++++++++++++++++++ 2 files changed, 657 insertions(+) create mode 100644 sync_mpv_client.py create mode 100644 sync_mpv_server.py diff --git a/sync_mpv_client.py b/sync_mpv_client.py new file mode 100644 index 0000000..9225f90 --- /dev/null +++ b/sync_mpv_client.py @@ -0,0 +1,422 @@ +#!/usr/bin/env python +from Crypto.Cipher import AES +from Crypto.Util.Padding import pad, unpad +from python_mpv_jsonipc import MPV +from configparser import ConfigParser + +import os +import sys +import time +import errno +import socket +import hashlib +import datetime + +global connected +global mpv + +def receive_message(client_socket): + + try: + message_header = client_socket.recv(HEADER_LENGTH) + message_length = int(message_header) + + except: + return "Reading Error", "Server" + + msg = client_socket.recv(message_length) + + decrypted_msg = decrypt_message(msg[16:], msg[:16]) + + return_list = decrypted_msg.split(" , ") + if len(return_list) > 1: + msg = return_list[0] + user = return_list[1] + + return msg, user + + else: + return decrypted_msg, "Server" + +def pause_video(mpv): + mpv.command("set_property","pause", True) + +def play_video(mpv): + global stop + stop = False + mpv.command("set_property","pause", False) + +def toggle_play(mpv): + isPaused = mpv.command("get_property","pause") + if isPaused == True: + play_video(mpv) + else: + pause_video(mpv) + +def new_video(mpv, new_link): + global t_playback + if new_link is not None: + mpv.play(new_link) + t_playback = 0 + +def send(clientsocket, msg): + global KEY + cipher = AES.new(KEY, AES.MODE_CBC) + if msg: + if type(msg) is not bytes: + msg = msg.encode("utf-8") + + msg = encrypt_message(msg) + send_length = prepare_concatenation(len(msg)) + + clientsocket.send(send_length) + clientsocket.send(msg) + +def encrypt_message(msg): + global KEY + + cipher = AES.new(KEY, AES.MODE_CBC) + + encrypted_msg = cipher.encrypt(pad(msg,AES.block_size)) + encrypted_msg = cipher.iv + encrypted_msg + + return encrypted_msg + +def decrypt_message(msg, IV): + global KEY + cipher = AES.new(KEY, AES.MODE_CBC, IV) + decrypted_msg = unpad(cipher.decrypt(msg), AES.block_size) + + try: + decrypted_msg = decrypted_msg.decode("utf-8") + except UnicodeDecodeError: + pass + + return decrypted_msg + +def prepare_concatenation(msg): + global HEADER_LENGTH + concat = str(msg).encode("utf-8") + concat += b' ' * (HEADER_LENGTH - len(concat)) + return concat + +def ready_when_seeked(mpv, value): + while True: + seek_bool = mpv.command("get_property","seeking") + if seek_bool == False: + pause_video(mpv) + send(client_socket, f"ready {value}") + break + +def exit_gracefully(): + send(client_socket, "!DISCONNECT") + +def handle_server(server,addr): + global connected + global t_playback + global mpv + global client_socket + global stop + global restart + + t_playback = 0 + t_restart = 0 + restart = 0 + connected = True + mpv = MPV(start_mpv=True, quit_callback=exit_gracefully) + mpv.command("set_property","osd-font-size","18") + +# @mpv.property_observer("paused-for-cache") +# def observe_playback_time(name, value): +# global restart +# if value == True: +# print("seek = true") +# restart+=1 +# if restart == 4: +# restart = 0 +# restart_url = mpv.command("get_property","path") +# skip_time = mpv.command("get_property","playback-time") +# +# send(client_socket, "toggle play") +# mpv.play("https://www.youtube.com/watch?v=dQw4w9WgXcQ") +# +# mpv.play(restart_url) +# #ready_when_seeked(mpv, value) +# while True: +# seek_bool = mpv.command("get_property","seeking") +# if seek_bool == False: +# pause_video(mpv) +# break +# +# mpv.command("set_property","playback-time",skip_time) +# #new_video(mpv, "https://www.youtube.com/watch?v=dQw4w9WgXcQ") + + # observe playback-time to synchronize when user skips on timeline + + @mpv.property_observer("playback-time") + def observe_playback_time(name, value): + global t_playback + global stop + if value is not None: + if value > t_playback+0.25 or value < t_playback-0.1: + + if f"mpv skip {value}" != msg: + stop = True + t_playback = mpv.command("get_property", "playback-time") + print (t_playback) + send(client_socket, f"mpv skip {t_playback}") + ready_when_seeked(mpv, value) + + t_playback = value + + # observe path to distribute new video url to other clients + + @mpv.property_observer("path") + def observe_path(name, value): + global stop + print(name, value) + if value is not None: + print (f"New Path: {value}") + print(msg) + if f"mpv new {value}" != msg: + stop = True + send(client_socket, f"mpv new {value}") + new_video(mpv, value) + print("READY - PATH OBSERVED") + ready_when_seeked(mpv, value) + + # when space pressed inform other clients of play/pause + + @mpv.on_key_press("space") + def toggle_playback(): + global stop + if stop == False: + toggle_play(mpv) + send(client_socket, "toggle play") + + # when q is pressed exit gracefully + + @mpv.on_key_press("q") + def terminate(): + global connected + print("Q") + send(client_socket, "!DISCONNECT") + connected = False + + @mpv.on_key_press(",") + def frame_back_step(): + send(client_socket, "frame-back-step") + pause_video(mpv) + mpv.command('frame-back-step') + + @mpv.on_key_press(".") + def frame_step(): + send(client_socket, "frame-step") + #pause_video(mpv) + mpv.command('frame-step') + + @mpv.on_key_press("-") + def subtract_speed(): + print("n") + send(client_socket, "subtract_speed") + mpv.command('add','speed',-0.1) + speed = mpv.command('get_property','speed') + mpv.command("show-text",f"Setting playback-speed to {speed}", "1500") + print('slowed down') + + @mpv.on_key_press("+") + def add_speed(): + print("") + send(client_socket, "add_speed") + mpv.command('add','speed',0.1) + speed = mpv.command('get_property','speed') + mpv.command("show-text",f"Setting playback-speed to {speed}", "1500") + print('sped up') + + @mpv.on_key_press("r") + def resync(): + time_pos = mpv.command("get_property","time-pos") + print(time_pos) + send(client_socket, f"mpv skip {time_pos}") + send(client_socket, "resync") + ready_when_seeked(mpv, time_pos) + + print(f"[CONNECTION ESTABLISHED] to {addr}") + + while connected: + global stop + + try: + msg, user = receive_message(server) + + if msg != False: + if msg == "!DISCONNECT": + connected = False + break + + if msg == "frame-step": + mpv.command('frame-step') + + if msg == 'add_speed': + mpv.command('add','speed',0.1) + speed = mpv.command('get_property','speed') + mpv.command("show-text",f"{user} sets playback-speed to {speed}", "1500") + + if msg == 'subtract_speed': + mpv.command('add','speed',-0.1) + speed = mpv.command('get_property','speed') + mpv.command("show-text",f"{user} sets playback-speed to {speed}", "1500") + + if msg == "frame-back-step": + mpv.command('frame-back-step') + + if msg == "mpv pause": + pause_video(mpv) + + if msg == "mpv terminate": + connected = False + + if msg == "mpv playback": + play_video(mpv) + + if "disconnected" in msg: + mpv.command("show-text",f"{msg}", "5000") + + if msg == "number of clients": + number_of_clients = int(msg.split[" "][3]) + print ("Number of Clients: %s"%number_of_clients) + + if msg == "resync": + mpv.command("show-text",f"{user} resyncs.", "1500") + + if msg == "toggle play": + toggle_play(mpv) + mpv.command("show-text",f"{user} toggles", "1500") + + if "mpv skip" in msg: + stop = True + t_playback = float(msg.split(" ")[2]) + + if t_playback < 3600: + converted_time = time.strftime("%M:%S", time.gmtime(t_playback)) + else: + converted_time = time.strftime("%H:%M:%S", time.gmtime(t_playback)) + + mpv.command("set_property","playback-time",msg.split(" ")[2]) + mpv.command("show-text",f"{user} skips to {converted_time}", "1500") + + ready_when_seeked(mpv, t_playback) + + if "mpv new" in msg: + stop = True + videopath = msg[8:] + new_video(mpv, videopath) + mpv.command("show-text",f"{user}: {videopath}","1500") + ready_when_seeked(mpv, videopath) + + if "userconnected" in msg: + mpv.command("show-text",f"{msg.split(' ')[1]} connected.","1500") + + + except IOError as e: + if e.errno != errno.EAGAIN and e.errno != errno.EWOULDBLOCK: + print(e) + print('Reading error: {}'.format(str(e))) + continue + #sys.exit() + + # We just did not receive anything + #break + + except Exception as e: + # Any other exception - something happened + print(e) + print('Reading error: '.format(str(e))) + #sys.exit() + + mpv.terminate() + client_socket.shutdown(socket.SHUT_RDWR) + client_socket.close() + +def parse_config(parser, configfile): + + parser.read(configfile) + IP = parser.get('connection', 'ip') + PORT = parser.getint('connection', 'port') + USERNAME = parser.get('connection', 'username') + PASSWORD = parser.get('connection', 'password') + return IP, PORT, USERNAME, PASSWORD + +def initialize(parser, configfile): + + IP = input("IP: ") + PASSWORD = input("Password: ") + USERNAME = input("Username: ") + parser['connection'] = { + 'ip': IP, + 'port': '51984', + 'username': USERNAME, + 'password': PASSWORD, + } + with open(configfile,"w") as f: + parser.write(f) + + +def main(): + + global KEY + global HEADER_LENGTH + global client_socket + + HEADER_LENGTH = 32 + FORMAT = 'utf-8' + DISCONNECT_MESSAGE ="!DISCONNECT" + + + configfolder = os.path.expanduser("~/.config/sync-mpv/") + configfile = os.path.expanduser("~/.config/sync-mpv/sync-mpv.conf") + + parser = ConfigParser() + + if os.path.exists(configfolder): + pass + else: + os.mkdir(configfolder) + + if os.path.exists(configfile): + IP, PORT, USERNAME, PASSWORD = parse_config(parser, configfile) + else: + initialize(parser, configfile) + + IP, PORT, USERNAME, PASSWORD = parse_config(parser, configfile) + + KEY = hashlib.sha256(PASSWORD.encode()).digest() + client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + # Connect to a given ip and port + while True: + try: + client_socket.connect((IP, PORT)) + break + except ConnectionRefusedError: + print("\nEnter new IP if server IP has changed.\nLeave blank otherwise.\n") + + IP = input("IP: ") + if IP == "": + pass + + config = ConfigParser() + config.read(configfile) + config.set('connection','ip', '%s' % IP) + + with open(configfile, "w") as f: + config.write(f) + + client_socket.setblocking(True) + + send(client_socket, USERNAME) + + handle_server(client_socket, (IP, PORT)) + +if __name__ == "__main__": + main() diff --git a/sync_mpv_server.py b/sync_mpv_server.py new file mode 100644 index 0000000..d54aeaa --- /dev/null +++ b/sync_mpv_server.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python +from Crypto.Cipher import AES +from Crypto.Util.Padding import pad, unpad +from configparser import ConfigParser +import socket +import select +import hashlib +import sys +import threading +import errno +import time +import os + + +HEADER_LENGTH = 32 + +IP = "0.0.0.0" +PORT = 51984 +FORMAT = 'utf-8' +DISCONNECT_MESSAGE ="!DISCONNECT" + +def prepare_concatenation(msg): + + concat = str(msg).encode("utf-8") + concat += b' ' * (HEADER_LENGTH - len(concat)) + return concat + +def send(clientsocket, msg): + + global KEY + + cipher = AES.new(KEY, AES.MODE_CBC) + + if type(msg) is not bytes: + msg = msg.encode("utf-8") + + msg = encrypt_message(msg) + send_length = prepare_concatenation(len(msg)) + + clientsocket.sendall(send_length) + clientsocket.sendall(msg) + +def encrypt_message(msg): + + global KEY + + cipher = AES.new(KEY, AES.MODE_CBC) + + encrypted_msg = cipher.encrypt(pad(msg,AES.block_size)) + encrypted_msg = cipher.iv + encrypted_msg + + return encrypted_msg + +def decrypt_message(msg, IV): + + global KEY + + cipher = AES.new(KEY, AES.MODE_CBC, IV) + decrypted_msg = unpad(cipher.decrypt(msg), AES.block_size) + + try: + decrypted_msg = decrypted_msg.decode("utf-8") + except UnicodeDecodeError: + pass + + return decrypted_msg + +def receive_message(client_socket): + + try: + message_header = client_socket.recv(HEADER_LENGTH) + + except: + return False + + message_length = int(message_header) + + msg = client_socket.recv(message_length) + decrypted_msg = decrypt_message(msg[16:], msg[:16]) + + return decrypted_msg + +def parse_config(parser, configfile): + + parser.read(configfile) + return parser.get('connection', 'password') + +def initialize(parser, configfile): + print("Decide for a 16-digit password.\nPassword will be saved under '~/.config/sync-mpv/serverpassword.conf'\n") + while True: + PASSWORD = input("Password: ") + if len(PASSWORD) == 16: + break + print(f"\nPassword is {len(PASSWORD)} digits long. Choose another.") + + parser['connection'] = { + 'password': '%s'%PASSWORD, + } + with open(configfile,"w") as f: + parser.write(f) + return PASSWORD + +def main(): + global KEY + + configfolder = os.path.expanduser("~/.config/sync-mpv/") + configfile = os.path.expanduser("~/.config/sync-mpv/serverpassword.conf") + parser = ConfigParser() + + if os.path.exists(configfolder): + pass + else: + os.mkdir(configfolder) + + if os.path.exists(configfile): + PASSWORD = parse_config(parser, configfile) + else: + PASSWORD = initialize(parser, configfile) + + KEY = hashlib.sha256(PASSWORD.encode()).digest() + + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_socket.bind((IP, PORT)) + server_socket.listen() + + sockets_list = [server_socket] + clients = {} + + print(f'Listening for connections on {IP}:{PORT}...') + + readycounter = 0 + last_video = None + while True: + read_sockets, _, exception_sockets = select.select(sockets_list, [], sockets_list) + + for notified_socket in read_sockets: + + # If notified socket is a server socket - new connection, accept it + if notified_socket == server_socket: + readycounter = 0 + + client_socket, client_address = server_socket.accept() + + user = receive_message(client_socket) + + # If False - client disconnected before he sent his name + if user is False or user == b'': + continue + + sockets_list.append(client_socket) + clients[client_socket] = user + + print('Accepted new connection from {}:{}, username: {}'.format(*client_address, user)) + + for client in clients: + send(client, f"number of clients {len(clients)}") + + if client != server_socket and client != client_socket: + send(client, f"userconnected {user}") + + if last_video is not None: + send(client, f"mpv new {last_video}") + + # Else existing socket is sending a message + else: + + try: + message = receive_message(notified_socket) + + # If False, client disconnected, cleanup + except: + #print('Closed connection from: {}'.format(clients[notified_socket])) + #sockets_list.remove(notified_socket) + + #del clients[notified_socket] + #readycounter = 0 + + #for client in clients: + # send(client, "number of clients %s"%len(clients)) + + continue + + if message != False: + + # Get user by notified socket, so we will know who sent the message + user = clients[notified_socket] + + print(f'Received message from {user}: {message}') + + if "mpv new" in message: + readycounter = 0 + last_video = message[8:] + + #if "mpv skip" in message: + # readycounter = 0 + + if "ready" in message: + print(len(clients)) + readycounter += 1 + print("readycounter ", readycounter) + + else: + readycounter = 0 + + if message == "!DISCONNECT": + for client_socket in clients: + if client_socket != notified_socket: + send(client_socket, f"{clients[notified_socket]} disconnected. , {user}") + + send(notified_socket, message) + sockets_list.remove(notified_socket) + del clients[notified_socket] + + if readycounter == len(clients): + for client_socket in clients: + send(client_socket, "mpv playback") + readycounter = 0 + + for client_socket in clients: + if "!DISCONNECT" != message: + if client_socket != notified_socket: + send(client_socket, f"{message} , {user}") + + # It's not really necessary to have this, but will handle some socket exceptions just in case + for notified_socket in exception_sockets: + + # Remove from list for socket.socket() + sockets_list.remove(notified_socket) + + # Remove from our list of users + del clients[notified_socket] + +if __name__ == "__main__": + main()