Source code for libvis.websocket.ws_server

import trio
import sys
from trio_websocket import serve_websocket, ConnectionClosed
from logging import getLogger
from .message_gen import message_gen

log = getLogger("Websocket")

[docs]class StopServer(Exception): pass
[docs]async def ws_serve(addr, port, iterable_fn, on_connect=lambda x:None, nursery=None): async def server(request): ws = await request.accept() on_connect(ws) async for message in iterable_fn(message_gen(ws)): try: await ws.send_message(message) except ConnectionClosed: log.warning("Connection closed") break try: await serve_websocket(server, addr, port, ssl_context=None, handler_nursery=nursery) except OSError as ose: print(f"Websocket start on {port} failed: {ose}", file=sys.stderr) return log.info("Websocket terminates")
[docs]def start_server(addr, port, handler_func=print, on_connect=lambda x: None): log.info(f"Starting ws server at {addr}:{port}") try: trio.run(ws_serve, addr, port, handler_func, on_connect) except ConnectionClosed as e: log.warning(f"Connection closed: {e}") return
[docs]def start_server_handler(addr, port, handler_func): async def handler(client_messages): async for message in client_messages: yield str(handler_func(message)) start_server(addr, port, handler)
[docs]def stop(): print("You found a stub!")
[docs]def main(): start_server('127.0.0.1',8000)
if __name__=='__main__': main()