from typing import Dict, List, Union, Optional
import datetime
import umsgpack
import webcface.func_info
import webcface.view_base
import webcface.canvas2d_base
import webcface.field
import webcface.log_handler
import webcface.image_frame
[docs]
class MessageBase:
kind_def = -1
kind: int
msg: dict
def __init__(self, kind: int, msg: dict):
self.kind = kind
self.msg = msg
[docs]
def time_to_int(t: datetime.datetime) -> int:
return int(t.timestamp() * 1000)
[docs]
def int_to_time(t: int) -> datetime.datetime:
return datetime.datetime.fromtimestamp(t / 1000)
[docs]
class SyncInit(MessageBase):
kind_def = 80
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(M: str, l: str, v: str) -> "SyncInit":
return SyncInit.new_full(M, 0, l, v, "")
[docs]
@staticmethod
def new_full(M: str, m: int, l: str, v: str, a: str) -> "SyncInit":
return SyncInit({"M": M, "m": m, "l": l, "v": v, "a": a})
@property
def member_name(self) -> str:
return self.msg["M"]
@property
def member_id(self) -> int:
return self.msg["m"]
@property
def lib_name(self) -> str:
return self.msg["l"]
@property
def lib_ver(self) -> str:
return self.msg["v"]
@property
def addr(self) -> str:
return self.msg["a"]
[docs]
class SyncInitEnd(MessageBase):
kind_def = 88
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(n: str, v: str, m: int, h: str) -> "SyncInitEnd":
return SyncInitEnd({"n": n, "v": v, "m": m, "h": h})
@property
def svr_name(self) -> str:
return self.msg["n"]
@property
def ver(self) -> str:
return self.msg["v"]
@property
def member_id(self) -> int:
return self.msg["m"]
@property
def hostname(self) -> str:
return self.msg["h"]
[docs]
class Ping(MessageBase):
kind_def = 89
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new() -> "Ping":
return Ping({})
[docs]
class PingStatus(MessageBase):
kind_def = 90
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(s: Dict[int, int]) -> "PingStatus":
return PingStatus({"s": s})
@property
def status(self) -> Dict[int, int]:
return self.msg["s"]
[docs]
class PingStatusReq(MessageBase):
kind_def = 91
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new() -> "PingStatusReq":
return PingStatusReq({})
[docs]
class Sync(MessageBase):
kind_def = 87
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new() -> "Sync":
return Sync({"m": 0, "t": time_to_int(datetime.datetime.now())})
[docs]
@staticmethod
def new_full(m: int, t: int) -> "Sync":
return Sync({"m": m, "t": t})
@property
def member_id(self) -> int:
return self.msg["m"]
@property
def time(self) -> datetime.datetime:
return int_to_time(self.msg["t"])
[docs]
class Value(MessageBase):
kind_def = 0
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(f: str, d: List[float]) -> "Value":
return Value({"f": f, "d": d})
@property
def field(self) -> str:
return self.msg["f"]
@property
def data(self) -> List[float]:
return self.msg["d"]
[docs]
class ValueReq(MessageBase):
kind_def = 40
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(m: str, f: str, i: int) -> "ValueReq":
return ValueReq({"M": m, "f": f, "i": i})
@property
def member(self) -> str:
return self.msg["M"]
@property
def field(self) -> str:
return self.msg["f"]
@property
def req_id(self) -> int:
return self.msg["i"]
[docs]
class ValueRes(MessageBase):
kind_def = 60
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(i: int, f: str, d: List[float]) -> "ValueRes":
return ValueRes({"i": i, "f": f, "d": d})
@property
def req_id(self) -> int:
return self.msg["i"]
@property
def sub_field(self) -> str:
return self.msg["f"]
@property
def data(self) -> List[float]:
return self.msg["d"]
[docs]
class ValueEntry(MessageBase):
kind_def = 20
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(m: int, f: str) -> "ValueEntry":
return ValueEntry({"m": m, "f": f})
@property
def member_id(self) -> int:
return self.msg["m"]
@property
def field(self) -> str:
return self.msg["f"]
[docs]
class Text(MessageBase):
kind_def = 1
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(f: str, d: str) -> "Text":
return Text({"f": f, "d": d})
@property
def field(self) -> str:
return self.msg["f"]
@property
def data(self) -> str:
return self.msg["d"]
[docs]
class TextReq(MessageBase):
kind_def = 41
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(m: str, f: str, i: int) -> "TextReq":
return TextReq({"M": m, "f": f, "i": i})
@property
def member(self) -> str:
return self.msg["M"]
@property
def field(self) -> str:
return self.msg["f"]
@property
def req_id(self) -> int:
return self.msg["i"]
[docs]
class TextRes(MessageBase):
kind_def = 61
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(i: int, f: str, d: str) -> "TextRes":
return TextRes({"i": i, "f": f, "d": d})
@property
def req_id(self) -> int:
return self.msg["i"]
@property
def sub_field(self) -> str:
return self.msg["f"]
@property
def data(self) -> str:
return self.msg["d"]
[docs]
class TextEntry(MessageBase):
kind_def = 21
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(m: int, f: str) -> "TextEntry":
return TextEntry({"m": m, "f": f})
@property
def member_id(self) -> int:
return self.msg["m"]
@property
def field(self) -> str:
return self.msg["f"]
[docs]
class Image(MessageBase):
kind_def = 5
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(f: str, d: bytes, w: int, h: int, l: int, p: int) -> "Image":
return Image({"f": f, "d": d, "h": h, "w": w, "l": l, "p": p})
@property
def field(self) -> str:
return self.msg["f"]
@property
def data(self) -> bytes:
return self.msg["d"]
@property
def height(self) -> int:
return self.msg["h"]
@property
def width(self) -> int:
return self.msg["w"]
@property
def color_mode(self) -> int:
return self.msg["l"]
@property
def cmp_mode(self) -> int:
return self.msg["p"]
[docs]
class ImageReq(MessageBase):
kind_def = 45
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(
m: str, f: str, i: Optional[int], r: "webcface.image_frame.ImageReq"
) -> "ImageReq":
return ImageReq(
{
"M": m,
"f": f,
"i": i,
"w": r.width,
"h": r.height,
"l": r.color_mode,
"p": r.compress_mode,
"q": r.quality,
"r": r.frame_rate,
}
)
@property
def member(self) -> str:
return self.msg["M"]
@property
def field(self) -> str:
return self.msg["f"]
@property
def req_id(self) -> int:
return self.msg["i"]
@property
def width(self) -> Optional[int]:
return self.msg["w"]
@property
def height(self) -> Optional[int]:
return self.msg["h"]
@property
def color_mode(self) -> Optional[int]:
return self.msg["l"]
@property
def cmp_mode(self) -> Optional[int]:
return self.msg["p"]
@property
def quality(self) -> Optional[int]:
return self.msg["q"]
@property
def frame_rate(self) -> Optional[float]:
return self.msg["r"]
[docs]
class ImageRes(MessageBase):
kind_def = 65
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(i: int, f: str, d: bytes, w: int, h: int, l: int, p: int) -> "ImageRes":
return ImageRes({"i": i, "f": f, "d": d, "h": h, "w": w, "l": l, "p": p})
@property
def req_id(self) -> int:
return self.msg["i"]
@property
def sub_field(self) -> str:
return self.msg["f"]
@property
def data(self) -> bytes:
return self.msg["d"]
@property
def height(self) -> int:
return self.msg["h"]
@property
def width(self) -> int:
return self.msg["w"]
@property
def color_mode(self) -> int:
return self.msg["l"]
@property
def cmp_mode(self) -> int:
return self.msg["p"]
[docs]
class ImageEntry(MessageBase):
kind_def = 25
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(m: int, f: str) -> "ImageEntry":
return ImageEntry({"m": m, "f": f})
@property
def member_id(self) -> int:
return self.msg["m"]
@property
def field(self) -> str:
return self.msg["f"]
[docs]
def vb_to_vd(vb: "Dict[str, webcface.view_base.ViewComponentBase]") -> dict:
"""ViewComponentBaseクラスからメッセージに変換"""
vd = {}
for i, b in vb.items():
vd[i] = {
"t": b._type,
"x": b._text,
"L": None if b._on_click_func is None else b._on_click_func._member,
"l": None if b._on_click_func is None else b._on_click_func._field,
"R": None if b._text_ref is None else b._text_ref._member,
"r": None if b._text_ref is None else b._text_ref._field,
"c": b._text_color,
"b": b._bg_color,
"im": b._min,
"ix": b._max,
"is": b._step,
"io": b._option,
}
return vd
[docs]
def vd_to_vb(vd: dict) -> "Dict[str, webcface.view_base.ViewComponentBase]":
"""メッセージからViewComponentBaseクラスに変換"""
vb = {}
for i, d in vd.items():
vb[i] = webcface.view_base.ViewComponentBase(
type=d["t"],
text=d["x"],
on_click=(
None
if d.get("L") is None or d.get("l") is None
else webcface.field.FieldBase(d["L"], d["l"])
),
text_ref=(
None
if d.get("R") is None or d.get("r") is None
else webcface.field.FieldBase(d["R"], d["r"])
),
text_color=d["c"],
bg_color=d["b"],
min=d.get("im"),
max=d.get("ix"),
step=d.get("is"),
option=d.get("io"),
)
return vb
[docs]
class View(MessageBase):
kind_def = 9
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(
f: str,
d: "Dict[str, webcface.view_base.ViewComponentBase]",
l: Optional[List[str]],
) -> "View":
return View({"f": f, "d": vb_to_vd(d), "l": l})
@property
def field(self) -> str:
return self.msg["f"]
@property
def data(self) -> "Dict[str, webcface.view_base.ViewComponentBase]":
return vd_to_vb(self.msg["d"])
@property
def ids(self) -> Optional[List[str]]:
return self.msg["l"]
[docs]
class ViewReq(MessageBase):
kind_def = 49
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(m: str, f: str, i: int) -> "ViewReq":
return ViewReq({"M": m, "f": f, "i": i})
@property
def member(self) -> str:
return self.msg["M"]
@property
def field(self) -> str:
return self.msg["f"]
@property
def req_id(self) -> int:
return self.msg["i"]
[docs]
class ViewRes(MessageBase):
kind_def = 69
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(
i: int,
f: str,
d: "Dict[str, webcface.view_base.ViewComponentBase]",
l: Optional[List[str]],
) -> "ViewRes":
return ViewRes({"i": i, "f": f, "d": vb_to_vd(d), "l": l})
@property
def req_id(self) -> int:
return self.msg["i"]
@property
def sub_field(self) -> str:
return self.msg["f"]
@property
def data_diff(self) -> "Dict[str, webcface.view_base.ViewComponentBase]":
return vd_to_vb(self.msg["d"])
@property
def ids(self) -> Optional[List[str]]:
return self.msg["l"]
[docs]
class ViewEntry(MessageBase):
kind_def = 29
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(m: int, f: str) -> "ViewEntry":
return ViewEntry({"m": m, "f": f})
@property
def member_id(self) -> int:
return self.msg["m"]
@property
def field(self) -> str:
return self.msg["f"]
[docs]
def c2b_to_c2d(vb: "Dict[str, webcface.canvas2d_base.Canvas2DComponentBase]") -> dict:
"""Canvas2dComponentBaseクラスからメッセージに変換"""
vd = {}
for i, b in vb.items():
vd[i] = {
"t": b._type,
"op": b._origin_pos,
"or": b._origin_rot,
"c": b._color,
"f": b._fill,
"s": b._stroke_width,
"gt": b._geometry_type,
"gp": b._geometry_properties,
}
return vd
[docs]
def c2d_to_c2b(vd: dict) -> "Dict[str, webcface.canvas2d_base.Canvas2DComponentBase]":
"""メッセージからCanvas2DComponentBaseクラスに変換"""
vb = {}
for i, d in vd.items():
vb[i] = webcface.canvas2d_base.Canvas2DComponentBase(
type=d["t"],
origin_pos=d["op"],
origin_rot=d["or"],
color=d["c"],
fill=d["f"],
stroke_width=d["s"],
geometry_type=d["gt"],
geometry_properties=d["gp"],
)
return vb
[docs]
class Canvas2D(MessageBase):
kind_def = 10
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(
f: str,
w: float,
h: float,
d: "Dict[str, webcface.canvas2d_base.Canvas2DComponentBase]",
l: Optional[List[str]],
) -> "Canvas2D":
return Canvas2D({"f": f, "w": w, "h": h, "d": c2b_to_c2d(d), "l": l})
@property
def field(self) -> str:
return self.msg["f"]
@property
def width(self) -> float:
return self.msg["w"]
@property
def height(self) -> float:
return self.msg["h"]
@property
def data(self) -> "Dict[str, webcface.canvas2d_base.Canvas2DComponentBase]":
return c2d_to_c2b(self.msg["d"])
@property
def ids(self) -> Optional[List[str]]:
return self.msg["l"]
[docs]
class Canvas2DReq(MessageBase):
kind_def = 50
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(m: str, f: str, i: int) -> "Canvas2DReq":
return Canvas2DReq({"M": m, "f": f, "i": i})
@property
def member(self) -> str:
return self.msg["M"]
@property
def field(self) -> str:
return self.msg["f"]
@property
def req_id(self) -> int:
return self.msg["i"]
[docs]
class Canvas2DRes(MessageBase):
kind_def = 70
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(
i: int,
f: str,
w: float,
h: float,
d: "Dict[str, webcface.canvas2d_base.Canvas2DComponentBase]",
l: Optional[List[str]],
) -> "Canvas2DRes":
return Canvas2DRes({"i": i, "f": f, "w": w, "h": h, "d": c2b_to_c2d(d), "l": l})
@property
def req_id(self) -> int:
return self.msg["i"]
@property
def sub_field(self) -> str:
return self.msg["f"]
@property
def width(self) -> float:
return self.msg["w"]
@property
def height(self) -> float:
return self.msg["h"]
@property
def data_diff(self) -> "Dict[str, webcface.canvas2d_base.Canvas2DComponentBase]":
return c2d_to_c2b(self.msg["d"])
@property
def ids(self) -> Optional[List[str]]:
return self.msg["l"]
[docs]
class Canvas2DEntry(MessageBase):
kind_def = 30
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(m: int, f: str) -> "Canvas2DEntry":
return Canvas2DEntry({"m": m, "f": f})
@property
def member_id(self) -> int:
return self.msg["m"]
@property
def field(self) -> str:
return self.msg["f"]
[docs]
def c3b_to_c3d(vb: "Dict[str, webcface.canvas3d_base.Canvas3DComponentBase]") -> dict:
"""Canvas3dComponentBaseクラスからメッセージに変換"""
vd = {}
for i, b in vb.items():
vd[i] = {
"t": b._type,
"op": b._origin_pos,
"or": b._origin_rot,
"c": b._color,
"gt": b._geometry_type,
"gp": b._geometry_properties,
"fm": b._field_member,
"ff": b._field_field,
"a": b._angles,
}
return vd
[docs]
def c3d_to_c3b(vd: dict) -> "Dict[str, webcface.canvas3d_base.Canvas3DComponentBase]":
"""メッセージからCanvas2DComponentBaseクラスに変換"""
vb = {}
for i, d in vd.items():
vb[i] = webcface.canvas3d_base.Canvas3DComponentBase(
type=d["t"],
origin_pos=d["op"],
origin_rot=d["or"],
color=d["c"],
geometry_type=d["gt"],
geometry_properties=d["gp"],
field_member=d["fm"],
field_field=d["ff"],
angles=d["a"],
)
return vb
[docs]
class Canvas3D(MessageBase):
kind_def = 11
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(
f: str,
d: "Dict[str, webcface.canvas3d_base.Canvas3DComponentBase]",
l: Optional[List[str]],
) -> "Canvas3D":
return Canvas3D({"f": f, "d": c3b_to_c3d(d), "l": l})
@property
def field(self) -> str:
return self.msg["f"]
@property
def data(self) -> "Dict[str, webcface.canvas3d_base.Canvas3DComponentBase]":
return c3d_to_c3b(self.msg["d"])
@property
def ids(self) -> Optional[List[str]]:
return self.msg["l"]
[docs]
class Canvas3DReq(MessageBase):
kind_def = 51
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(m: str, f: str, i: int) -> "Canvas3DReq":
return Canvas3DReq({"M": m, "f": f, "i": i})
@property
def member(self) -> str:
return self.msg["M"]
@property
def field(self) -> str:
return self.msg["f"]
@property
def req_id(self) -> int:
return self.msg["i"]
[docs]
class Canvas3DRes(MessageBase):
kind_def = 71
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(
i: int,
f: str,
d: "Dict[str, webcface.canvas3d_base.Canvas3DComponentBase]",
l: Optional[List[str]],
) -> "Canvas3DRes":
return Canvas3DRes({"i": i, "f": f, "d": c3b_to_c3d(d), "l": l})
@property
def req_id(self) -> int:
return self.msg["i"]
@property
def sub_field(self) -> str:
return self.msg["f"]
@property
def data_diff(self) -> "Dict[str, webcface.canvas3d_base.Canvas3DComponentBase]":
return c3d_to_c3b(self.msg["d"])
@property
def ids(self) -> Optional[List[str]]:
return self.msg["l"]
[docs]
class Canvas3DEntry(MessageBase):
kind_def = 31
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(m: int, f: str) -> "Canvas3DEntry":
return Canvas3DEntry({"m": m, "f": f})
@property
def member_id(self) -> int:
return self.msg["m"]
@property
def field(self) -> str:
return self.msg["f"]
[docs]
class FuncInfo(MessageBase):
kind_def = 84
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(f: str, fi: "webcface.func_info.FuncInfo") -> "FuncInfo":
return FuncInfo.new_full(0, f, fi)
[docs]
@staticmethod
def new_full(m: int, f: str, fi: "webcface.func_info.FuncInfo") -> "FuncInfo":
ad = []
for a in fi.args:
ad.append(
{
"n": a.name,
"t": a.type,
"i": a.init,
"m": a.min,
"x": a.max,
"o": a.option,
}
)
return FuncInfo({"m": m, "f": f, "r": fi.return_type, "a": ad})
@property
def member_id(self) -> int:
return self.msg["m"]
@property
def field(self) -> str:
return self.msg["f"]
@property
def func_info(self) -> "webcface.func_info.FuncInfo":
args = []
for a in self.msg["a"]:
args.append(
webcface.func_info.Arg(
name=a["n"],
type=a["t"],
init=a["i"],
min=a["m"],
max=a["x"],
option=a["o"],
)
)
return webcface.func_info.FuncInfo(None, self.msg["r"], args)
[docs]
class Call(MessageBase):
kind_def = 81
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(i: int, c: int, r: int, f: str, a: List[Union[float, bool, str]]) -> "Call":
return Call({"i": i, "c": c, "r": r, "f": f, "a": a})
@property
def caller_id(self) -> int:
return self.msg["i"]
@property
def caller_member_id(self) -> int:
return self.msg["c"]
@property
def target_member_id(self) -> int:
return self.msg["r"]
@property
def field(self) -> str:
return self.msg["f"]
@property
def args(self) -> List[Union[float, bool, str]]:
return self.msg["a"]
[docs]
class CallResponse(MessageBase):
kind_def = 82
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(i: int, c: int, s: bool) -> "CallResponse":
return CallResponse({"i": i, "c": c, "s": s})
@property
def caller_id(self) -> int:
return self.msg["i"]
@property
def caller_member_id(self) -> int:
return self.msg["c"]
@property
def started(self) -> bool:
return self.msg["s"]
[docs]
class CallResult(MessageBase):
kind_def = 83
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(i: int, c: int, e: bool, r: Union[float, bool, str]) -> "CallResult":
return CallResult({"i": i, "c": c, "e": e, "r": r})
@property
def caller_id(self) -> int:
return self.msg["i"]
@property
def caller_member_id(self) -> int:
return self.msg["c"]
@property
def is_error(self) -> bool:
return self.msg["e"]
@property
def result(self) -> Union[float, bool, str]:
return self.msg["r"]
[docs]
def msg2logline(lm: List[Dict]) -> "List[webcface.log_handler.LogLine]":
return [
webcface.log_handler.LogLine(l["v"], int_to_time(l["t"]), l["m"]) for l in lm
]
[docs]
def logline2msg(lls: "List[webcface.log_handler.LogLine]") -> List[Dict]:
return [{"v": ll.level, "t": time_to_int(ll.time), "m": ll.message} for ll in lls]
[docs]
class Log(MessageBase):
kind_def = 8
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(f: str, lls: "List[webcface.log_handler.LogLine]") -> "Log":
return Log(
{
"f": f,
"l": logline2msg(lls),
}
)
@property
def field(self) -> str:
return self.msg["f"]
@property
def log(self) -> "List[webcface.log_handler.LogLine]":
return msg2logline(self.msg["l"])
[docs]
class LogRes(MessageBase):
kind_def = 68
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(i: int, f: str, lls: "List[webcface.log_handler.LogLine]") -> "LogRes":
return LogRes(
{
"i": i,
"f": f,
"l": logline2msg(lls),
}
)
@property
def req_id(self) -> int:
return self.msg["i"]
@property
def sub_field(self) -> str:
return self.msg["f"]
@property
def log(self) -> "List[webcface.log_handler.LogLine]":
return msg2logline(self.msg["l"])
[docs]
class LogReq(MessageBase):
kind_def = 48
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(m: str, f: str, i: int) -> "LogReq":
return LogReq({"M": m, "f": f, "i": i})
@property
def member(self) -> str:
return self.msg["M"]
@property
def field(self) -> str:
return self.msg["f"]
@property
def req_id(self) -> int:
return self.msg["i"]
[docs]
class LogEntry(MessageBase):
kind_def = 28
def __init__(self, msg: dict) -> None:
super().__init__(self.kind_def, msg)
[docs]
@staticmethod
def new(m: int, f: str) -> "LogEntry":
return LogEntry({"m": m, "f": f})
@property
def member_id(self) -> int:
return self.msg["m"]
@property
def field(self) -> str:
return self.msg["f"]
# 受信する可能性のあるメッセージのリスト
message_classes_recv = [
SyncInit,
SyncInitEnd,
Ping,
PingStatus,
Sync,
ValueRes,
ValueEntry,
TextRes,
TextEntry,
ImageRes,
ImageEntry,
ViewRes,
ViewEntry,
Canvas2DRes,
Canvas2DEntry,
Canvas3DRes,
Canvas3DEntry,
FuncInfo,
Call,
CallResponse,
CallResult,
LogEntry,
LogRes,
]
[docs]
def pack(msgs: List[MessageBase]) -> bytes:
send_msgs: List[Union[int, dict]] = []
for m in msgs:
send_msgs.append(m.kind)
send_msgs.append(m.msg)
return umsgpack.packb(send_msgs)
[docs]
def unpack(packed: bytes) -> List[MessageBase]:
unpack_obj = umsgpack.unpackb(packed, strict_map_key=False)
assert len(unpack_obj) % 2 == 0
msg_ret = []
for i in range(0, len(unpack_obj), 2):
kind = unpack_obj[i]
msg = unpack_obj[i + 1]
assert isinstance(kind, int)
assert isinstance(msg, dict)
for C in message_classes_recv:
if kind == C.kind_def:
msg_ret.append(C(msg))
return msg_ret