Source code for libvis.websocket.test_ws_server

import time
# Using threading instead of multiprocessing 
# to allow recording received messages
from multiprocessing.dummy import Process

# For some reason trio does not rethrow StopIteration
from trio_websocket import ConnectionClosed
from .ws_server import start_server
from .ws_client import start_client
from .message_gen import async_list, async_gen


[docs]def test_simple(): client_received = [] server_received = [] async def server(client_messages): yield 'Ping' async for message in client_messages: print("<<", message) server_received.append(message) #Listen only for one message raise ConnectionClosed('Done all stuff') async def client(server_messages): async for message in server_messages: print(">>", message) client_received.append(message) # If the following line is outside message loop, # server and client are deadlocked. Someone has to start yield 'Pong' Process(target=start_server, args=('0.0.0.0', 8000, server) ).start() time.sleep(.1) start_client('ws://localhost:8000', client) assert client_received[0] == 'Ping' assert server_received[0] == 'Pong' print(async_list(server(async_gen(['Maca','Waca'])))) print(async_list(client(async_gen('ahhaehasuh1'))))