from typing import TypeVar, Generic, Dict, Tuple, Optional, Callable, List, Union
import threading
import datetime
import logging
import webcface.field
import webcface.func_info
import webcface.view_base
import webcface.log_handler
import webcface.canvas2d_base
import webcface.canvas3d_base
import webcface.image_frame
T = TypeVar("T")
R = TypeVar("R")
[docs]
class SyncDataStore2(Generic[T, R]):
self_member_name: str
data_send: Dict[str, T]
data_send_prev: Dict[str, T]
data_recv: Dict[str, Dict[str, T]]
entry: Dict[str, List[str]]
req: Dict[str, Dict[str, int]]
req_info: Dict[str, Dict[str, R]]
lock: threading.RLock
should_send: Callable
def __init__(self, name: str, should_send: Optional[Callable] = None) -> None:
self.self_member_name = name
self.data_send = {}
self.data_send_prev = {}
self.data_recv = {}
self.entry = {}
self.req = {}
self.req_info = {}
self.lock = threading.RLock()
self.should_send = should_send or SyncDataStore2.should_send_always
[docs]
def is_self(self, member: str) -> bool:
return self.self_member_name == member
[docs]
@staticmethod
def should_send_always(prev, current) -> bool:
return True
[docs]
@staticmethod
def should_not_send_twice(prev, current) -> bool:
if prev is None:
return True
return False
[docs]
@staticmethod
def should_send_on_change(prev, current) -> bool:
if prev is None or prev != current:
return True
return False
[docs]
def set_send(self, field: str, data: T) -> None:
with self.lock:
if self.should_send(
self.data_recv.get(self.self_member_name, {}).get(field), data
):
self.data_send[field] = data
self.set_recv(self.self_member_name, field, data)
[docs]
def set_recv(self, member: str, field: str, data: T) -> None:
with self.lock:
if member not in self.data_recv:
self.data_recv[member] = {}
self.data_recv[member][field] = data
[docs]
def add_req(self, member: str, field: str, req_data: Optional[R] = None) -> int:
with self.lock:
if not self.is_self(member) and self.req.get(member, {}).get(field, 0) == 0:
max_req = 0
for r in self.req.values():
max_req = max(max_req, max(r.values()))
new_req = max_req + 1
if member not in self.req:
self.req[member] = {}
self.req[member][field] = new_req
if req_data is not None:
if member not in self.req_info:
self.req_info[member] = {}
self.req_info[member][field] = req_data
return new_req
elif (
not self.is_self(member)
and req_data is not None
and self.req_info.get(member, {}).get(field) != req_data
):
if member not in self.req_info:
self.req_info[member] = {}
self.req_info[member][field] = req_data
return self.req[member][field]
return 0
[docs]
def get_req_info(self, member: str, field: str) -> Optional[R]:
with self.lock:
return self.req_info.get(member, {}).get(field)
[docs]
def get_recv(self, member: str, field: str) -> Optional[T]:
with self.lock:
d = self.data_recv.get(member, {}).get(field)
return d
[docs]
def unset_recv(self, member: str, field: str) -> bool:
with self.lock:
if self.data_recv.get(member, {}).get(field) is not None:
del self.data_recv[member][field]
if not self.is_self(member) and self.req.get(member, {}).get(field, 0) > 0:
self.req[member][field] = 0
return True
return False
[docs]
def get_members(self) -> List[str]:
with self.lock:
return list(self.entry.keys())
[docs]
def get_entry(self, member: str) -> List[str]:
with self.lock:
return self.entry.get(member, [])
[docs]
def init_member(self, member: str) -> None:
with self.lock:
self.entry[member] = []
self.data_recv[member] = {}
[docs]
def set_entry(self, member: str, field: str) -> None:
with self.lock:
if member not in self.entry:
self.entry[member] = []
self.entry[member].append(field)
[docs]
def transfer_send(self, is_first: bool) -> Dict[str, T]:
with self.lock:
if is_first:
self.data_send = {}
self.data_send_prev = {}
data_current = self.data_recv.get(self.self_member_name, {})
for k, v in data_current.items():
self.data_send_prev[k] = v
return data_current
else:
s = self.data_send
self.data_send_prev = s
self.data_send = {}
return s
[docs]
def get_send_prev(self, is_first: bool) -> Dict[str, T]:
with self.lock:
if is_first:
return {}
else:
return self.data_send_prev
[docs]
def transfer_req(self) -> Dict[str, Dict[str, int]]:
with self.lock:
# if is_first:
# self.req_send = {}
return self.req
# else:
# r = self.req_send
# self.req_send = {}
# return r
[docs]
def get_req(self, i: int, sub_field: str) -> Tuple[str, str]:
with self.lock:
for rm, r in self.req.items():
for rf, ri in r.items():
if ri == i:
if sub_field != "":
return (rm, rf + "." + sub_field)
else:
return (rm, rf)
return ("", "")
[docs]
class SyncDataStore1(Generic[T]):
self_member_name: str
data_recv: Dict[str, T]
req: Dict[str, bool]
entry: List[str]
lock: threading.RLock
def __init__(self, name: str) -> None:
self.self_member_name = name
self.data_recv = {}
self.req = {}
self.entry = []
self.lock = threading.RLock()
[docs]
def is_self(self, member: str) -> bool:
return self.self_member_name == member
[docs]
def set_recv(self, member: str, data: T) -> None:
with self.lock:
self.data_recv[member] = data
[docs]
def add_req(self, member: str) -> bool:
with self.lock:
if not self.is_self(member) and not self.req.get(member, False):
self.req[member] = True
return True
return False
[docs]
def get_recv(self, member: str) -> Optional[T]:
with self.lock:
return self.data_recv.get(member, None)
[docs]
def clear_req(self, member: str) -> bool:
with self.lock:
if not self.is_self(member) and self.req.get(member, False):
self.req[member] = False
return True
return False
[docs]
def set_entry(self, member: str) -> None:
with self.lock:
if member not in self.entry:
self.entry.append(member)
[docs]
def clear_entry(self, member: str) -> None:
with self.lock:
if member in self.entry:
self.entry.remove(member)
[docs]
def get_entry(self, member: str) -> bool:
with self.lock:
return member in self.entry
[docs]
def transfer_req(self) -> Dict[str, bool]:
with self.lock:
# if is_first:
# self.req_send = {}
return self.req
# else:
# r = self.req_send
# self.req_send = {}
# return r
[docs]
class FuncResultStore:
results: "List[Optional[webcface.func_info.PromiseData]]"
lock: threading.Lock
def __init__(self):
self.results = []
self.lock = threading.Lock()
[docs]
def add_result(
self,
caller: str,
base: "webcface.field.Field",
) -> "webcface.func_info.Promise":
with self.lock:
caller_id = len(self.results)
r = webcface.func_info.PromiseData(base, caller_id, caller)
self.results.append(r)
return webcface.func_info.Promise(r)
[docs]
def get_result(self, caller_id: int) -> "webcface.func_info.PromiseData":
with self.lock:
r = self.results[caller_id]
if r is None:
raise IndexError()
return r
[docs]
def del_result(self, caller_id: int) -> None:
with self.lock:
if caller_id < len(self.results):
self.results[caller_id] = None
[docs]
class ClientData:
self_member_name: str
value_store: SyncDataStore2[List[float], None]
text_store: SyncDataStore2[Union[float, bool, str], None]
image_store: (
"SyncDataStore2[webcface.image_frame.ImageFrame, webcface.image_frame.ImageReq]"
)
func_store: "SyncDataStore2[webcface.func_info.FuncInfo, None]"
view_store: "SyncDataStore2[webcface.view.ViewData, None]"
canvas2d_store: "SyncDataStore2[webcface.canvas2d.Canvas2DData, None]"
canvas3d_store: "SyncDataStore2[webcface.canvas3d.Canvas3DData, None]"
log_store: "SyncDataStore2[webcface.log_handler.LogData, None]"
sync_time_store: SyncDataStore1[datetime.datetime]
func_result_store: FuncResultStore
func_listener_handlers: "Dict[str, List[webcface.func_info.CallHandle]]"
member_ids: Dict[str, int]
member_lib_name: Dict[int, str]
member_lib_ver: Dict[int, str]
member_remote_addr: Dict[int, str]
svr_name: str
svr_version: str
svr_hostname: str
ping_status_req: bool
ping_status: Dict[int, int]
connected: bool
_connection_cv: threading.Condition
_msg_first: bool # syncInitメッセージをqueueに入れたらtrue
_msg_queue: "List[List[webcface.message.MessageBase]]"
_msg_cv: threading.Condition
recv_queue: List[bytes]
recv_cv: threading.Condition
logger_internal: logging.Logger
self_member_id: Optional[int]
sync_init_end: bool
auto_reconnect: bool
on_member_entry: Optional[Callable]
on_ping: Dict[str, Callable]
on_value_entry: Dict[str, Callable]
on_text_entry: Dict[str, Callable]
on_image_entry: Dict[str, Callable]
on_view_entry: Dict[str, Callable]
on_func_entry: Dict[str, Callable]
on_canvas2d_entry: Dict[str, Callable]
on_canvas3d_entry: Dict[str, Callable]
on_log_entry: Dict[str, Callable]
on_sync: Dict[str, Callable]
on_value_change: Dict[str, Dict[str, Callable]]
on_text_change: Dict[str, Dict[str, Callable]]
on_image_change: Dict[str, Dict[str, Callable]]
on_view_change: Dict[str, Dict[str, Callable]]
on_canvas2d_change: Dict[str, Dict[str, Callable]]
on_canvas3d_change: Dict[str, Dict[str, Callable]]
on_log_change: Dict[str, Callable]
def __init__(
self, name: str, logger_internal: logging.Logger, auto_reconnect: bool
) -> None:
self.self_member_name = name
self.value_store = SyncDataStore2[List[float], None](
name, SyncDataStore2.should_send_on_change
)
self.text_store = SyncDataStore2[Union[float, bool, str], None](
name, SyncDataStore2.should_send_on_change
)
self.image_store = SyncDataStore2[
webcface.image_frame.ImageFrame, webcface.image_frame.ImageReq
](name)
self.func_store = SyncDataStore2[webcface.func_info.FuncInfo, None](
name, SyncDataStore2.should_not_send_twice
)
self.view_store = SyncDataStore2[webcface.view.ViewData, None](name)
self.canvas2d_store = SyncDataStore2[webcface.canvas2d.Canvas2DData, None](name)
self.canvas3d_store = SyncDataStore2[webcface.canvas3d.Canvas3DData, None](name)
self.log_store = SyncDataStore2[webcface.log_handler.LogData, None](name)
self.sync_time_store = SyncDataStore1[datetime.datetime](name)
self.func_result_store = FuncResultStore()
self.func_listener_handlers = {}
self.member_ids = {}
self.member_lib_name = {}
self.member_lib_ver = {}
self.member_remote_addr = {}
self.svr_name = ""
self.svr_version = ""
self.svr_hostname = ""
self.ping_status_req = False
self.ping_status = {}
self.connected = False
self._connection_cv = threading.Condition()
self._msg_first = False
self._msg_queue = []
self._msg_cv = threading.Condition()
self.recv_queue = []
self.recv_cv = threading.Condition()
self.logger_internal = logger_internal
self.self_member_id = None
self.sync_init_end = False
self.auto_reconnect = auto_reconnect
self.on_member_entry = None
self.on_ping = {}
self.on_value_entry = {}
self.on_view_entry = {}
self.on_text_entry = {}
self.on_image_entry = {}
self.on_func_entry = {}
self.on_canvas2d_entry = {}
self.on_canvas3d_entry = {}
self.on_log_entry = {}
self.on_sync = {}
self.on_value_change = {}
self.on_text_change = {}
self.on_image_change = {}
self.on_view_change = {}
self.on_canvas2d_change = {}
self.on_canvas3d_change = {}
self.on_log_change = {}
[docs]
def queue_first(self) -> None:
with self._msg_cv:
self._msg_queue.insert(0, webcface.client_impl.sync_data_first(self))
self._msg_first = True
[docs]
def queue_msg_always(self, msgs: "List[webcface.message.MessageBase]") -> None:
"""メッセージをキューに入れる"""
with self._msg_cv:
self._msg_queue.append(msgs)
self._msg_cv.notify_all()
[docs]
def queue_msg_online(self, msgs: "List[webcface.message.MessageBase]") -> bool:
"""接続できていればキューに入れtrueを返す"""
with self._connection_cv:
if self.connected:
with self._msg_cv:
self._msg_queue.append(msgs)
self._msg_cv.notify_all()
return True
return False
[docs]
def queue_msg_req(self, msgs: "List[webcface.message.MessageBase]") -> bool:
"""msg_firstが空でなければキューに入れtrueを返す"""
with self._msg_cv:
if self._msg_first:
self._msg_queue.append(msgs)
self._msg_cv.notify_all()
return True
return False
[docs]
def clear_msg(self) -> None:
with self._msg_cv:
self._msg_queue = []
self._msg_first = False
self._msg_cv.notify_all()
[docs]
def has_msg(self) -> bool:
return len(self._msg_queue) > 0
[docs]
def wait_msg(self, timeout: Optional[float] = None) -> None:
with self._msg_cv:
while len(self._msg_queue) == 0:
self._msg_cv.wait(timeout)
if timeout is not None:
break
[docs]
def wait_empty(self, timeout: Optional[float] = None) -> None:
with self._msg_cv:
while len(self._msg_queue) > 0:
self._msg_cv.wait(timeout)
if timeout is not None:
break
[docs]
def pop_msg(self) -> "Optional[List[webcface.message.MessageBase]]":
with self._msg_cv:
if len(self._msg_queue) == 0:
return None
msg = self._msg_queue.pop(0)
self._msg_cv.notify_all()
return msg
[docs]
def is_self(self, member: str) -> bool:
return self.self_member_name == member
[docs]
def get_member_name_from_id(self, m_id: int) -> str:
for k, v in self.member_ids.items():
if v == m_id:
return k
return ""
[docs]
def get_member_id_from_name(self, name: str) -> int:
return self.member_ids.get(name, 0)