Source code for neurobooth_os.netcomm.client

# Import socket module
import socket
import select
from time import time, sleep
import re
import os
import pandas as pd
from io import StringIO

from neurobooth_os.secrets_info import secrets


def socket_message(message, node_name, wait_data=False):
    """ Send a string message though socket connection to `node_name`.

    Parameters
    ----------
    message : str
        The message to send.
    node_name : str
        The node to send the socket message to
    wait_data : bool
        If True, wait for the data.

    Returns
    -------
    data : str
        Returns the data from the node_name.
    """
    def connect():
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        # connect to server on local computer
        s.connect((host, port))
        s.send(message.encode('ascii'))

        data = None
        if wait_data:
            data = wait_socket_data(s)

        s.close()
        return data

    host, port = node_info(node_name)

    try:
        data = connect()
    except TimeoutError:
        print(f"{node_name} socket connexion timed out, trying to restart server")
        pid = start_server(node_name)
        data = connect()

    return data


def socket_time(node_name, print_flag=1, time_out=3):
    """Computes connextion time from client->server and client->server->client.

    Parameters
    ----------
    node_name : str
        name of the server
    print_flag : int, optional
        if True, prints time taken
    time_out : int, optional
        Time of seconds waiting to hear from server, by default 3

    Returns
    -------
    times floats
        taken time to server and time to server and back
    """

    host, port = node_info(node_name)

    message = "time_test"
    t0 = time()
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(time_out)

    try:
        # connect to server on local computer
        s.connect((host, port))
    except BaseException:
        print(f"{node_name} socket connexion timed out, trying to restart server")
        start_server(node_name)
        t0 = time()
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.settimeout(time_out * 2)
        s.connect((host, port))

    s.send(message.encode('ascii'))
    # message received from server
    data = wait_socket_data(s, 2)
    s.close()

    t1 = time()
    time_send = float(data.split("_")[-1])
    time_1way = time_send - t0
    time_2way = t1 - t0

    if print_flag:
        print(f"Return took {time_2way}, sent {time_1way}")

    return time_2way, time_1way


def node_info(node_name):
    """Gets the host and port of the node_name.

    Parameters
    ----------
    node_name : str
        Name of the node from which port and host is retrieved.

    Returns
    -------
    host str
        name of the host
    port int
        port number
    """
    port = 12347
    if node_name == "acquisition":
        host = 'acq'
    elif node_name == "presentation":
        host = 'stm'
    elif node_name == "control":
        host = 'ctr'
    elif node_name == "dummy_acq":
        host = 'localhost'
        port = 1280
    elif node_name == "dummy_stm":
        host = 'localhost'
        port = 1281
    elif node_name == "dummy_ctr":
        host = 'localhost'
        port = 1282
    return host, port


def wait_socket_data(s, wait_time=None):

    tic = time()
    while True:
        r, _, _ = select.select([s], [], [s], 1)
        if r:
            data = s.recv(1024)
            return data.decode("utf-8")

        if wait_time is not None:
            if time() - tic > wait_time:
                print("Socket timed out")
                return "TIMED-OUT_-999"


[docs]def start_server(node_name, save_pid_txt=True): """ Makes a network call to run script serv_{node_name}.bat First remote processes are logged, then a scheduled task is created to run the remote batch file, then task runs, and new python PIDs are captured with the option to save to save_pid_txt. If saved, when the function is called it will kill the PIDs in the file. Parameters ---------- node_name : str PC node name defined in `secrets_info.secrets` save_pid_txt : bool Option to save PID to file for killing PID in the future. Returns ------- pid : list Python process identifiers found in remote computer after server started. """ if node_name in ["acquisition", "presentation"]: s = secrets[node_name] else: print("Not a known node name") return None # Kill any previous server kill_pid_txt(node_name=node_name) # get list of python processes task_cmd = f"tasklist.exe /S {s['name']} /U {s['user']} /P {s['pass']}" out = os.popen(task_cmd).read() pids_old = get_python_pids(out) # Get list of scheduled tasks and run TaskOnEvent if not running cmd_out = f"SCHTASKS /query /fo CSV /nh /S {s['name']} /U {s['name']}\\{s['user']} /P {s['pass']}" out = os.popen(cmd_out).read().replace('\\', "") df = pd.read_csv(StringIO(out), sep=",", index_col=0, names = ['date', 'status']) task_name = 'TaskOnEvent1' while True: if task_name in out: # if task already running add n+1 to task name if df.loc[task_name, 'status'] == 'Running': tsk_inx = int(task_name[-1]) + 1 task_name = task_name[:-1] + str(tsk_inx) print(f"Creating new scheduled task: {task_name} in server {node_name}") continue break # Run scheduled task cmd1 creates a scheduled task, cmd2 initiates it cmd_str = f"SCHTASKS /S {s['name']} /U {s['name']}\\{s['user']} /P {s['pass']}" cmd_1 = cmd_str + \ f" /Create /TN {task_name} /TR {s['bat']} /SC ONEVENT /EC Application /MO *[System/EventID=777] /f" cmd_2 = cmd_str + f' /Run /TN {task_name}' out = os.popen(cmd_1).read() out = os.popen(cmd_2).read() sleep(.3) out = os.popen(task_cmd).read() pids_new = get_python_pids(out) pid = [p for p in pids_new if p not in pids_old] print(f"{node_name.upper()} server initiated with pid {pid}") if save_pid_txt: with open("server_pids.txt", "a") as f: f.write(f"{pid}|{node_name}|{time()}\n") return pid
def get_python_pids(output_tasklist): # From popen tasklist output procs = output_tasklist.split("\n") re_pyth = re.compile("python.exe[\\s]*([0-9]*)") pyth_pids = [] for prc in procs: srch = re_pyth.search(prc) if srch is not None: pyth_pids.append(srch.groups()[0]) return pyth_pids def kill_remote_pid(pids, node_name): if node_name in ["acquisition", "presentation"]: s = secrets[node_name] else: print("Not a known node name") return None if isinstance(pids, str): pids = [pids] cmd = f"taskkill /S {s['name']} /U {s['user']} /P {s['pass']} /PID %s" for pid in pids: out = os.popen(cmd % pid) print(out.read()) return def kill_pid_txt(txt_name="server_pids.txt", node_name=None): if not os.path.exists(txt_name): return with open(txt_name, "r+") as f: Lines = f.readlines() if len(Lines): print(f"Closing {len(Lines)} remote processes") new_lines = [] for line in Lines: pid, node, tsmp = line.split("|") if node_name is not None and node_name != node: new_lines.append(line) continue kill_remote_pid(eval(pid), node) f.seek(0) if len(new_lines): f.writelines(new_lines) else: f.write("") f.truncate()