Source code for streamlit_pyvista.server_managers.server_manager_proxified

import argparse
import ipaddress
import os
import subprocess
import threading
import traceback
from datetime import datetime
from threading import Lock
from typing import Optional

import requests
from flask import request, make_response

from streamlit_pyvista import ROOT_URL
from streamlit_pyvista.helpers.cache import save_file_content, DEFAULT_CACHE_DIR, DEFAULT_VIEWER_CACHE_NAME
from streamlit_pyvista.helpers.streamlit_pyvista_logging import root_logger
from streamlit_pyvista.helpers.utils import (find_free_port, is_server_alive,
                                             wait_for_server_alive, ServerItem, with_lock)
from streamlit_pyvista.message_interface import (EndpointsInterface, ProxyMessageInterface,
                                                 ServerMessageInterface)
from streamlit_pyvista.server_managers.server_manager import ServerManagerBase

runner_lock = Lock()


[docs] def run_trame_viewer(server_port: int, file_path: str): """ Launch a Trame server using python subprocess """ try: subprocess.run(["python3", file_path, "--server", "--port", str(server_port)], capture_output=True, text=True, check=True) except subprocess.CalledProcessError as e: error_message = f""" Command '{e.cmd}' returned non-zero exit status {e.returncode}. STDOUT: {e.stdout} STDERR: {e.stderr} Python Traceback: {traceback.format_exc()} """ root_logger.error(f"Trame Server {server_port} crashed") root_logger.debug(f"Failed with the following error: {error_message}")
[docs] def run_proxy(): def start(nbr_tries=0): try: subprocess.run(["streamlit-pyvista", "run", "proxy"], capture_output=True, text=True, check=True) except subprocess.CalledProcessError as e: error_message = f""" Command '{e.cmd}' returned non-zero exit status {e.returncode}. STDOUT: {e.stdout} STDERR: {e.stderr} Python Traceback: {traceback.format_exc()} """ root_logger.error("Proxy crashed") root_logger.debug(f"Failed with the following error: {error_message}") if nbr_tries < 0: start(nbr_tries + 1) threading.Thread(target=start, daemon=True).start()
[docs] class ServerManagerProxified(ServerManagerBase): """ Implementation of ServerManagerBase that make use of a proxy for remote connection """
[docs] def __init__(self, host: str = "0.0.0.0", port: int = 8080, proxy_host: str = "127.0.0.1", proxy_port: int = 5000): super().__init__(host, port) self.proxy_host = proxy_host self.proxy_port = proxy_port self.base_url = ROOT_URL
def _extract_proxy_host(self): """ Extract the proxy host from the request headers. """ # Identify host used to access the server self.proxy_host = request.headers.get("Host").split(":") # Extract only the hostname and use proxy port if len(self.proxy_host) > 1: self.proxy_host[1] = str(self.proxy_port) self.proxy_host = ":".join(self.proxy_host) else: self.proxy_host = self.proxy_host[0] def _get_host_for_client(self, server_id: int) -> str: """ Change the host's endpoint to use the proxy, check if the host is an ip or a domain to reply with the right host Args: server_id (int): The id of the server. Returns: str: The host for the client. """ try: ipaddress.ip_address(self.proxy_host) host = f"{EndpointsInterface.Protocol}://{self.proxy_host}{self.base_url}/server/{server_id}" except ValueError: host = f"{EndpointsInterface.Protocol}://{self.proxy_host}{self.base_url}/server/{server_id}" return host
[docs] @with_lock(runner_lock) def init_connection(self): # Get the host of the proxy self._extract_proxy_host() file_content, checksum = self._get_viewer_content_from_request(request) if file_content is None: return make_response({ "Invalid body": f"Expected to have the viewer content encoded in base64 in" f" the {ServerMessageInterface.Keys.Viewer} field of the json"}, 400) # Check if any server already running is available and if one was found use it and response with its endpoints available_server = self._find_available_server(checksum) if len(self.servers_running) >= self.maximum_nbr_instances: available_server = self.get_oldest_server(checksum) if available_server is not None: self.servers_running.append(available_server) res = requests.get(f"{available_server.host}/init_connection").json() res[ServerMessageInterface.Keys.Host] = ( f"{EndpointsInterface.Protocol}://{self.proxy_host}{self.base_url}" f"/server/{available_server.host.split('/')[-1]}" ) return make_response(res, 200) file_path = save_file_content(file_content, f"{DEFAULT_CACHE_DIR}/{DEFAULT_VIEWER_CACHE_NAME}")[0] port = find_free_port() # Run the trame server in a new thread threading.Thread(target=run_trame_viewer, args=[port, file_path]).start() # Wait for server to come alive if not wait_for_server_alive(f"{EndpointsInterface.Localhost}:{port}", timeout=self.timeout): return make_response({ "Server Timeout Error": f"Unable to connect to Trame instance on port {port},\ the server might have crashed"}, 400) res = requests.get(f"{EndpointsInterface.Localhost}:{port}/init_connection") res = res.json() # Add the new server to the proxy server list proxy_url = f"{EndpointsInterface.Localhost}:{self.proxy_port}" # Launch the proxy if it's not already running if not is_server_alive(proxy_url): run_proxy() if not wait_for_server_alive(proxy_url, timeout=self.timeout): return make_response({ "Server Timeout Error": "Unable to connect to the proxy, the server might have crashed"}, 400) res[ServerMessageInterface.Keys.Host] = self.register_server_to_proxy(res["host"]) root_logger.debug( f"Trame Server {port} was launched successfully and routed with the proxy with " f"id={res[ServerMessageInterface.Keys.Host].split('/')[-1]}") # Since communication with the server running is local, no need to use the url, localhost + port is always # working self.servers_running.append( ServerItem(res[ServerMessageInterface.Keys.Host], checksum, datetime.now().strftime("%Y-%m-%d %H:%M:%S"), file_path)) return make_response(res, 200)
[docs] def register_server_to_proxy(self, host): proxy_res = requests.get( f"http://{self.proxy_host}{self.base_url}{EndpointsInterface.Proxy.UpdateAvailableServers}", json={ProxyMessageInterface.Keys.Action: ProxyMessageInterface.Actions.Add, ProxyMessageInterface.Keys.ServerURL: host}) server_id = proxy_res.json()[ProxyMessageInterface.Keys.ServerID] return self._get_host_for_client(server_id)
def _find_available_server(self, server_type: str) -> Optional[ServerItem]: if not is_server_alive(f"{EndpointsInterface.Localhost}:{self.proxy_port}"): run_proxy() self.servers_running.clear() return super()._find_available_server(server_type) def _remove_and_kill_server(self, server: ServerItem): super()._remove_and_kill_server(server) payload = {ProxyMessageInterface.Keys.Action: ProxyMessageInterface.Actions.Remove, ProxyMessageInterface.Keys.ServerURL: server.host} requests.get(f"{EndpointsInterface.Protocol}://{self.proxy_host}{self.base_url}" f"{EndpointsInterface.Proxy.UpdateAvailableServers}", json=payload)
[docs] @staticmethod def get_launch_path(): return os.path.abspath(__file__)
def _lifecycle_task_kill_server(self, servers_running): servers_killed = super()._lifecycle_task_kill_server(servers_running) # Complete the routine by notifying the proxy of all the server that needs to be unreferenced for server in servers_killed: payload = {ProxyMessageInterface.Keys.Action: ProxyMessageInterface.Actions.Remove, ProxyMessageInterface.Keys.ServerURL: server.host} requests.get(f"{EndpointsInterface.Protocol}://{self.proxy_host}{self.base_url}" f"{EndpointsInterface.Proxy.UpdateAvailableServers}", json=payload) return servers_killed
if __name__ == "__main__": # Add command line argument and support parser = argparse.ArgumentParser(description='Launch a trame server instance') # Add the port argument that allow user to specify the port to use for the server from command line parser.add_argument('--port', type=int, help='Specify the port of the server') # Add --server flag that is used to specify whether to use the trame as only a server and block the # automatic open of a browser parser.add_argument('--server', action="store_true", help='Specify if the trame is opened as a server') args = parser.parse_args() server_manager = ServerManagerProxified(port=args.port) server_manager.run_server()