Source code for libvis.websocket.ws_client

import trio
from sys import stderr
from trio_websocket import open_websocket_url, ConnectionClosed
from .message_gen import message_gen

[docs]def main(addr='wss://echo.websocket.org', message='hello world'): trio.run(send_one, addr, message)
[docs]async def communicate(addr, iterable_fn): try: async with open_websocket_url(addr) as ws: async for message in iterable_fn(message_gen(ws)): try: await ws.send_message(message) except ConnectionClosed: print('ConnectionClosed while sending') return except OSError as ose: print('Connection attempt failed: %s' % ose, file=stderr)
[docs]def start_client(addr, iterable_fn): print('Starting client') try: trio.run(communicate, addr, iterable_fn) except StopIteration: return
def _default_on_error(err): print('Connection failed: %s'%err)
[docs]async def open_ws_wrapper(addr,on_connect, on_error=_default_on_error): try: async with open_websocket_url(addr) as ws: on_connect(ws) except OSError as ose: on_error(ose) except ConnectionClosed as e: on_error(e)
[docs]async def send_one(addr, message): try: async with open_websocket_url(addr) as ws: await ws.send_message(message) message = await ws.get_message() print('Received message: %s' % message) except OSError as ose: print('Connection attempt failed: %s' % ose, file=stderr)
[docs]async def send_iter(addr, iterable): try: async with open_websocket_url(addr) as ws: for message in iterable: await ws.send_message(message) except OSError as ose: print('Connection attempt failed: %s' % ose, file=stderr)
if __name__=="__main__": main()