Source code for webcface.func_info

from typing import Callable, Optional, List, SupportsFloat, Union
from enum import IntEnum
from copy import deepcopy
import inspect
import threading
import webcface.field
import webcface.member
from webcface.typing import convertible_to_float


[docs] class ValType(IntEnum): NONE = 0 STRING = 1 STR = 1 BOOL = 2 INT = 3 FLOAT = 4
[docs] def get_type_enum(t: type) -> int: if t == int: return ValType.INT if t == float: return ValType.FLOAT if t == bool: return ValType.BOOL if t == str: return ValType.STRING if ( t == inspect.Parameter.empty or t == inspect.Signature.empty or t == type(None) or t is None ): return ValType.NONE return ValType.NONE
[docs] class Arg: _name: str _type: int _min: Optional[float] _max: Optional[float] _init: Optional[Union[float, bool, str]] _option: List[Union[float, str]] def __init__( self, name: str = "", type: Union[int, type] = ValType.NONE, min: Optional[SupportsFloat] = None, max: Optional[SupportsFloat] = None, init: Optional[Union[SupportsFloat, bool, str]] = None, option: List[Union[SupportsFloat, str]] = [], ) -> None: self._name = name if isinstance(type, int): self._type = type else: self._type = get_type_enum(type) self._min = None if min is None else float(min) self._max = None if max is None else float(max) if init is None: self._init = None elif isinstance(init, bool): self._init = init elif convertible_to_float(init): self._init = float(init) else: self._init = str(init) self._option = [] for op in option: if convertible_to_float(op): self._option.append(float(op)) else: self._option.append(str(op))
[docs] def merge_config(self, a: "Arg") -> "Arg": if a._name != "": self._name = a._name if a._type != ValType.NONE: self._type = a._type if a._init is not None: self._init = a._init if a._max is not None: self._max = a._max if a._min is not None: self._min = a._min if len(a._option) > 0: self._option = a._option return self
def __repr__(self) -> str: s = f"name={repr(self._name)}, type={repr(self._type)}" if self._min is not None: s += f", min={repr(self._min)}" if self._max is not None: s += f", max={repr(self._max)}" if self._init is not None: s += f", init={repr(self._init)}" if len(self._option) > 0: s += f", option={repr(self._option)}" return "Arg(" + s + ")" @property def name(self) -> str: return self._name @property def type(self) -> int: return self._type @property def init(self) -> Optional[Union[float, bool, str]]: return self._init @property def max(self) -> Optional[float]: return self._max @property def min(self) -> Optional[float]: return self._min @property def option(self) -> List[Union[float, str]]: return self._option
[docs] class FuncInfo: return_type: int args: List[Arg] func_impl: Callable def __init__( self, func: Optional[Callable], return_type: Optional[Union[int, type]], args: Optional[List[Arg]], in_thread: bool = False, handle: bool = False, ) -> None: if args is None: self.args = [] else: self.args = deepcopy(args) if func is None: sig = None else: sig = inspect.signature(func) first_annotation: Union[None, type, str] = None if len(sig.parameters) >= 1: first_annotation = list(sig.parameters.values())[0].annotation # from __future__ import annotations がある場合strで返ってくるのでそのチェックもする if ( first_annotation is webcface.func_info.CallHandle or first_annotation == "CallHandle" or ( isinstance(first_annotation, str) and first_annotation.endswith(".CallHandle") ) ): handle = True else: handle = False assert len(self.args) == 0 or len(self.args) == len( sig.parameters ), "number of args information passed and actual parameters number does not match" for i, pname in enumerate(sig.parameters): p = sig.parameters[pname] if p.default != inspect.Parameter.empty: init = p.default else: init = None auto_arg = Arg(name=pname, type=p.annotation, init=init) if i < len(self.args): self.args[i] = auto_arg.merge_config(self.args[i]) else: self.args.append(auto_arg) if isinstance(return_type, int): self.return_type = return_type elif isinstance(return_type, type): self.return_type = get_type_enum(return_type) elif sig is not None: self.return_type = get_type_enum(sig.return_annotation) else: raise ValueError() def func_impl(p: PromiseData) -> None: if func is None: p._set_finish("func is None", is_error=True) else: try: if handle: func(CallHandle(p)) else: ret = func(*p._args) if ret is None: p._set_finish("", is_error=False) elif isinstance(ret, bool): p._set_finish(ret, is_error=False) elif convertible_to_float(ret): p._set_finish(float(ret), is_error=False) else: p._set_finish(str(ret), is_error=False) except Exception as e: p._set_finish(str(e), is_error=True) if in_thread: self.func_impl = lambda p: threading.Thread( target=func_impl, args=(p,), daemon=True ) else: self.func_impl = func_impl
[docs] def run(self, p: "PromiseData", args) -> None: if len(args) != len(self.args): # raise TypeError(f"requires {len(self.args)} arguments but got {len(args)}") p._set_finish( f"requires {len(self.args)} arguments but got {len(args)}", is_error=True, ) return new_args: List[Union[float, bool, str]] = [] for i, a in enumerate(args): if self.args[i].type == ValType.INT: new_args.append(int(float(a))) elif self.args[i].type == ValType.FLOAT: new_args.append(float(a)) elif self.args[i].type == ValType.BOOL: new_args.append(bool(a)) elif self.args[i].type == ValType.STRING: if isinstance(a, bool): new_args.append(str(int(a))) else: new_args.append(str(a)) else: new_args.append(a) p._args = new_args self.func_impl(p)
[docs] class FuncNotFoundError(RuntimeError): def __init__(self, base: "webcface.field.FieldBase") -> None: super().__init__(f'member("{base._member}").func("{base._field}") is not set')
[docs] class PromiseData: _base: "webcface.field.Field" _caller_id: int _caller: str _args: List[Union[float, bool, str]] _reached: bool _found: bool _finished: bool _result: Union[float, bool, str] _result_is_error: bool _on_reach: Optional[Callable] _reach_event_done: bool _on_finish: Optional[Callable] _finish_event_done: bool _cv: threading.Condition def __init__( self, base: "webcface.field.Field", caller_id: int = 0, caller: str = "" ) -> None: self._base = base self._args = [] self._reached = False self._found = False self._finished = False self._result = "" self._result_is_error = False self._on_reach = None self._on_finish = None self._reach_event_done = False self._finish_event_done = False self._cv = threading.Condition() self._caller_id = caller_id self._caller = caller def _set_reach(self, found: bool) -> None: run_reach_func: Optional[Callable] = None with self._cv: self._reached = True self._found = found if not self._reach_event_done and self._on_reach is not None: self._reach_event_done = True run_reach_func = self._on_reach if run_reach_func is not None: run_reach_func(Promise(self)) with self._cv: self._cv.notify_all() if not found: self._set_finish( f'member("{self._base._member}").func("{self._base._field}") is not set', is_error=True, ) def _set_finish(self, result: Union[float, bool, str], is_error: bool) -> None: run_finish_func: Optional[Callable] = None with self._cv: self._finished = True self._result_is_error = is_error self._result = result if not self._finish_event_done and self._on_finish is not None: self._finish_event_done = True run_finish_func = self._on_finish if run_finish_func is not None: run_finish_func(Promise(self)) with self._cv: self._cv.notify_all()
[docs] class Promise: """非同期で実行した関数の実行結果を表す。 ver2.0〜 AsyncFuncResultからPromiseに名前変更 """ _data: PromiseData def __init__(self, data: PromiseData) -> None: self._data = data @property def member(self) -> "webcface.member.Member": """関数のMember""" return webcface.member.Member(self._data._base) @property def name(self) -> str: """関数のfield名""" return self._data._base._field @property def started(self) -> bool: """関数が開始したらTrue, 存在しなければFalse Falseの場合自動でresultにもFuncNotFoundErrorが入る .. deprecated:: ver2.0 """ self.wait_reach() return self.found @property def started_ready(self) -> bool: """startedが取得可能であればTrue .. deprecated:: ver2.0 (reached と同じ) """ return self.reached @property def reached(self) -> bool: """関数呼び出しのメッセージが相手のクライアントに到達したらTrue (ver2.0〜) """ return self._data._reached @property def found(self) -> bool: """呼び出した関数がリモートに存在するか(=実行が開始されたか)を返す (ver2.0〜) """ return self._data._found
[docs] def wait_reach(self, timeout: Optional[float] = None) -> "Promise": """リモートに呼び出しメッセージが到達するまで待機 (ver2.0〜) * reached がtrueになるまで待機する。 * on_reached にコールバックが設定されている場合そのコールバックの完了も待機する。 * Client.sync() を呼ぶのとは別のスレッドで使用することを想定している。 呼び出しが成功したかどうかの情報の受信は Client.sync() で行われるため、 この関数を使用して待機している間に Client.sync() が呼ばれていないとデッドロックしてしまうので注意。 :param timeout: 待機するタイムアウト (秒) """ with self._data._cv: while not self._data._reached: self._data._cv.wait(timeout) return self
@property def result(self) -> Union[float, bool, str]: """実行結果または例外 結果が返ってくるまで待機する。 .. deprecated:: ver2.0 """ with self._data._cv: while not self._data._finished: self._data._cv.wait() if not self._data._found: raise FuncNotFoundError(self._data._base) if self._data._result_is_error: raise RuntimeError(self._data._result) return self._data._result @property def result_ready(self) -> bool: """resultが取得可能であればTrue .. deprecated:: ver2.0 (finished と同じ) """ return self._data._finished @property def finished(self) -> bool: """関数の実行が完了したかどうかを返す (ver2.0〜) """ return self._data._finished @property def is_error(self) -> bool: """関数がエラーになったかどうかを返す (ver2.0〜) """ return self._data._result_is_error @property def response(self) -> Union[float, bool, str]: """関数の実行が完了した場合その戻り値を返す (ver2.0〜) """ if self._data._result_is_error: return "" return self._data._result @property def rejection(self) -> str: """関数の実行がエラーになった場合そのエラーメッセージを返す (ver2.0〜) """ if self._data._result_is_error: return str(self._data._result) return ""
[docs] def wait_finish(self, timeout: Optional[float] = None) -> "Promise": """関数の実行が完了するまで待機 (ver2.0〜) * finished がtrueになるまで待機する。 * on_finished にコールバックが設定されている場合そのコールバックの完了も待機する。 * Client.sync() を呼ぶのとは別のスレッドで使用することを想定している。 呼び出しが成功したかどうかの情報の受信は Client.sync() で行われるため、 この関数を使用して待機している間に Client.sync() が呼ばれていないとデッドロックしてしまうので注意。 :param timeout: 待機するタイムアウト (秒) """ with self._data._cv: while not self._data._finished: self._data._cv.wait(timeout) return self
[docs] def on_reach(self, func: Callable) -> "Promise": """リモートに呼び出しメッセージが到達したときに呼び出すコールバックを設定 (ver2.0〜) * コールバックの引数にはこのPromiseが渡される。 * すでにreachedがtrueの場合はこのスレッドで即座にcallbackが呼ばれる。 """ with self._data._cv: if not self._data._reach_event_done: self._data._on_reach = func if self._data._reached: func(self) self._data._reach_event_done = True return self
[docs] def on_finish(self, func: Callable) -> "Promise": """関数の実行が完了したときに呼び出すコールバックを設定 (ver2.0〜) * コールバックの引数にはこのPromiseが渡される。 * すでにfinishedがtrueの場合はこのスレッドで即座にcallbackが呼ばれる。 """ run_func = False with self._data._cv: if not self._data._finish_event_done: self._data._on_finish = func if self._data._finished: self._data._finish_event_done = True run_func = True if run_func: func(self) return self
AsyncFuncResult = Promise
[docs] class CallHandle: _data: PromiseData def __init__( self, data: PromiseData, ) -> None: self._data = data @property def args(self) -> List[Union[float, bool, str]]: return self._data._args
[docs] def respond(self, result: Union[float, bool, str] = "") -> None: self._data._set_finish(result, False)
[docs] def reject(self, reason: str) -> None: self._data._set_finish(reason, True)
[docs] def assert_args_num(self, expected: int) -> bool: if len(self._data._args) != expected: self.reject( f"requires {expected} arguments but got {len(self._data._args)}" ) return False return True