import threading
import multiprocessing
import time
from typing import Optional, Iterable, Callable
import logging
import io
import os
import atexit
import websocket
import webcface.member
import webcface.field
import webcface.client_data
import webcface.message
import webcface.client_impl
[docs]
class Client(webcface.member.Member):
"""サーバーに接続する
詳細は `Clientのドキュメント <https://na-trium-144.github.io/webcface/md_01__client.html>`_ を参照
:arg name: 名前
:arg host: サーバーのアドレス
:arg port: サーバーのポート
:arg auto_reconnect: (ver2.0〜) 通信が切断された時に自動で再接続する。(デフォルト: True)
:arg auto_sync: (ver2.1〜) 指定した間隔(秒)ごとに別スレッドで自動的に sync() をする (デフォルト: None (syncしない))
"""
_ws: Optional[websocket.WebSocketApp]
_closing: bool
_reconnect_thread: threading.Thread
_send_thread: threading.Thread
_auto_sync: Optional[float]
_sync_thread: Optional[threading.Thread]
def __init__(
self,
name: str = "",
host: str = "127.0.0.1",
port: int = 7530,
auto_reconnect: bool = True,
auto_sync: Optional[float] = None,
) -> None:
logger = logging.getLogger(f"webcface_internal({name})")
handler = logging.StreamHandler()
fmt = logging.Formatter("%(name)s [%(levelname)s] %(message)s")
handler.setFormatter(fmt)
logger.addHandler(handler)
if "WEBCFACE_TRACE" in os.environ:
logger.setLevel(logging.DEBUG)
elif "WEBCFACE_VERBOSE" in os.environ:
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.CRITICAL + 1)
super().__init__(
webcface.field.Field(
webcface.client_data.ClientData(name, logger, auto_reconnect), name
),
name,
)
self._ws = None
self._closing = False
data = self._data_check()
def on_open(ws):
data.logger_internal.info("WebSocket Open")
# syncInitメッセージを準備してなければqueueの先頭に入れる
if not data._msg_first:
data.queue_first()
# 接続完了 send_threadが動き始める
with data._connection_cv:
data.connected = True
data._connection_cv.notify_all()
def on_message(ws, message: bytes):
data.logger_internal.debug("Received message")
# webcface.client_impl.on_recv(self, data, message)
with data.recv_cv:
data.recv_queue.append(message)
data.recv_cv.notify_all()
def on_error(ws, error):
data.logger_internal.info(f"WebSocket Error: {error}")
def on_close(ws, close_status_code, close_msg):
data.logger_internal.info("WebSocket Closed")
with data._connection_cv:
data.connected = False
data._connection_cv.notify_all()
data.clear_msg()
data.self_member_id = None
data.sync_init_end = False
# data.queue_msg(webcface.client_impl.sync_data_first(self, data))
def reconnect():
while not self._closing:
self._ws = websocket.WebSocketApp(
f"ws://{host}:{port}/",
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
try:
self._ws.run_forever()
except Exception as e:
data.logger_internal.debug(f"WebSocket Error: {e}")
if not data.auto_reconnect:
break
if not self._closing:
time.sleep(0.1)
data.logger_internal.debug(f"reconnect_thread end")
self._reconnect_thread = threading.Thread(target=reconnect, daemon=True)
def msg_send():
data = self._data_check()
while self._reconnect_thread.is_alive():
while (
not data.connected or not data.has_msg()
) and self._reconnect_thread.is_alive():
if not data.connected:
with data._connection_cv:
data._connection_cv.wait(timeout=0.1)
data.wait_msg(timeout=0.1)
msgs = self._data_check().pop_msg()
if msgs is not None and self._ws is not None and self.connected:
try:
data.logger_internal.debug("Sending message")
self._ws.send(webcface.message.pack(msgs))
except Exception as e:
data.logger_internal.error(f"Error Sending message {e}")
self._send_thread = threading.Thread(target=msg_send, daemon=True)
self._auto_sync = auto_sync
self._sync_thread = None
# data.queue_msg(webcface.client_impl.sync_data_first(self, data))
def close_at_exit():
data.logger_internal.debug(
"Client close triggered at interpreter termination"
)
self.close()
if self._reconnect_thread.is_alive():
self._reconnect_thread.join()
if self._send_thread.is_alive():
self._send_thread.join()
if self._sync_thread is not None and self._sync_thread.is_alive():
self._sync_thread.join()
atexit.register(close_at_exit)
[docs]
def close(self) -> None:
"""接続を切る
* ver1.1.1〜 キューにたまっているデータがすべて送信されるまで待機
* ver1.1.2〜 サーバーへの接続に失敗した場合は待機しない
"""
if not self._closing:
self._closing = True
while self._data_check().has_msg() and self._reconnect_thread.is_alive():
self._data_check().wait_empty(timeout=1)
if self._ws is not None:
self._ws.close()
[docs]
def start(self) -> None:
"""サーバーに接続を開始する"""
if not self._reconnect_thread.is_alive():
self._reconnect_thread.start()
if not self._send_thread.is_alive():
self._send_thread.start()
if self._auto_sync is not None:
if self._sync_thread is None:
def loop_sync():
while self._reconnect_thread.is_alive():
self.sync(timeout=self._auto_sync, auto_start=False)
self._sync_thread = threading.Thread(target=loop_sync, daemon=True)
if not self._sync_thread.is_alive():
self._sync_thread.start()
[docs]
def wait_connection(self) -> None:
"""サーバーに接続が成功するまで待機する。
接続していない場合、start()を呼び出す。
"""
self.start()
data = self._data_check()
while not data.connected or not data.sync_init_end:
if not data.connected:
with data._connection_cv:
self._data_check()._connection_cv.wait()
else:
if len(data.recv_queue) == 0:
with data.recv_cv:
data.recv_cv.wait(timeout=None)
self.sync(timeout=0)
@property
def connected(self) -> bool:
"""サーバーに接続できていればtrue
ver2.0からプロパティ
"""
return self._data_check().connected
[docs]
def sync(self, timeout: Optional[float] = 0, auto_start: bool = True) -> None:
"""送信用にセットしたデータをすべて送信キューに入れ、受信したデータを処理する
* 実際に送信をするのは別スレッドであり、この関数はブロックしない。
* サーバーに接続していない場合、start()を呼び出す。
* ver2.0〜: 受信したデータがあれば各種コールバックをこのスレッドで呼び出し、
それがすべて完了するまでこの関数はブロックされる。
* ver2.0〜: timeoutが正の場合、データを受信してもしなくても
timeout 経過するまでは繰り返しsync()を再試行する。
timeout=0 または負の値なら再試行せず即座にreturnする。
(デフォルト、ver1.1までのsync()と同じ)
* timeout がNoneの場合、close()するまで無制限に待機する。
* autoReconnectがfalseでサーバーに接続できてない場合はreturnする。
(deadlock回避)
:param timeout: (ver2.0〜) sync()を再試行するタイムアウト (秒単位の実数、またはNone)
:param auto_start: (ver2.1〜)
"""
if auto_start:
self.start()
data = self._data_check()
if data._msg_first:
data.queue_msg_always(webcface.client_impl.sync_data(data, False))
else:
data.queue_first()
if hasattr(time, "time_ns"):
time_ns = time.time_ns
else:
def time_ns():
return int(time.time() * 1e9)
start_ns = time_ns()
timeout_ns = round(timeout * 1e9) if timeout is not None else None
while not self._closing and (data.connected or data.auto_reconnect):
with data.recv_cv:
if len(data.recv_queue) == 0:
timeout_now = None
if timeout_ns is not None:
timeout_now = (timeout_ns - (time_ns() - start_ns)) / 1e9
data.recv_cv.wait(timeout=timeout_now)
for msg in data.recv_queue:
webcface.client_impl.on_recv(self, data, msg)
data.recv_queue = []
if timeout_ns is not None and time_ns() - start_ns >= timeout_ns:
break
[docs]
def member(self, member_name: str) -> webcface.member.Member:
"""他のメンバーにアクセスする"""
return webcface.member.Member(self, member_name)
[docs]
def members(self) -> Iterable[webcface.member.Member]:
"""サーバーに接続されている他のmemberをすべて取得する。
自分自身と、無名のmemberを除く。
"""
return map(self.member, self._data_check().value_store.get_members())
[docs]
def on_member_entry(self, func: Callable) -> Callable:
"""Memberが追加されたときのイベント
コールバックの引数にはMemberオブジェクトが渡される。
ver2.0〜:
* 呼び出したいコールバック関数をfuncとして
:code:`client.on_member_entry(func)`
などとすれば関数を登録できる。
* または :code:`@client.on_member_entry` をデコレーターとして使う。
"""
self._data_check().on_member_entry = func
return func
@property
def logging_handler(self) -> logging.Handler:
"""webcfaceに出力するloggingのHandler
(ver2.1〜: Log名は "default", log().handler と同じ)
:return: logger.addHandler にセットして使う
"""
return webcface.log_handler.Handler(self._data_check(), "default")
@property
def logging_io(self) -> io.TextIOBase:
"""webcfaceとstderrに出力するio
(ver2.1〜: Log名は "default", log().io と同じ)
"""
return webcface.log_handler.LogWriteIO(self._data_check(), "default")
@property
def server_name(self) -> str:
"""サーバーの識別情報
:return: 通常は"webcface"が返る
"""
return self._data_check().svr_name
@property
def server_version(self) -> str:
"""サーバーのバージョン"""
return self._data_check().svr_version
@property
def server_hostname(self) -> str:
"""サーバーのホスト名
(ver2.0〜)
"""
return self._data_check().svr_hostname