scapy.automaton

Automata with states, transitions and actions.

class scapy.automaton.ATMT[源代码]

基类:object

ACTION = 'Action'
CONDITION = 'Condition'
IOEVENT = 'I/O event'
exception NewStateRequested(state_func: Any, automaton: ATMT, *args: Any, **kargs: Any)[源代码]

基类:Exception

action_parameters(*args: Any, **kargs: Any) NewStateRequested[源代码]
run() Any[源代码]
RECV = 'Receive condition'
STATE = 'State'
TIMEOUT = 'Timeout condition'
static action(cond: Any, prio: int = 0) Callable[[_StateWrapper, _StateWrapper], _StateWrapper][源代码]
static condition(state: Any, prio: int = 0) Callable[[_StateWrapper, _StateWrapper], _StateWrapper][源代码]
static ioevent(state: _StateWrapper, name: str, prio: int = 0, as_supersocket: str | None = None) Callable[[_StateWrapper, _StateWrapper], _StateWrapper][源代码]
static receive_condition(state: _StateWrapper, prio: int = 0) Callable[[_StateWrapper, _StateWrapper], _StateWrapper][源代码]
static state(initial: int = 0, final: int = 0, stop: int = 0, error: int = 0) Callable[[DecoratorCallable], DecoratorCallable][源代码]
static timeout(state: _StateWrapper, timeout: int | float) Callable[[_StateWrapper, _StateWrapper, Timer], _StateWrapper][源代码]
static timer(state: _StateWrapper, timeout: float | int, prio: int = 0) Callable[[_StateWrapper, _StateWrapper, Timer], _StateWrapper][源代码]
class scapy.automaton.Automaton(self, debug=0, store=1, **kargs)[源代码]

基类:object

exception AutomatonError(msg: str, state: Message | None = None, result: str | None = None)[源代码]

基类:AutomatonException

exception AutomatonException(msg: str, state: Message | None = None, result: str | None = None)[源代码]

基类:Exception

exception AutomatonStopped(msg: str, state: Message | None = None, result: str | None = None)[源代码]

基类:AutomatonException

exception Breakpoint(msg: str, state: Message | None = None, result: str | None = None)[源代码]

基类:AutomatonStopped

exception CommandMessage(msg: str, state: Message | None = None, result: str | None = None)[源代码]

基类:AutomatonException

exception ErrorState(msg: str, state: Message | None = None, result: str | None = None)[源代码]

基类:AutomatonException

exception InterceptionPoint(msg: str, state: Message | None = None, result: str | None = None, packet: Packet | None = None)[源代码]

基类:AutomatonStopped

exception Singlestep(msg: str, state: Message | None = None, result: str | None = None)[源代码]

基类:AutomatonStopped

exception Stuck(msg: str, state: Message | None = None, result: str | None = None)[源代码]

基类:AutomatonException

accept_packet(pkt: Packet | None = None, wait: bool | None = False) Any[源代码]
actions: Dict[str, List[_StateWrapper]] = {}
add_breakpoints(*bps: Any) None[源代码]
add_interception_points(*ipts: Any) None[源代码]
conditions: Dict[str, List[_StateWrapper]] = {}
debug(lvl: int, msg: str) None[源代码]
destroy() None[源代码]

Destroys a stopped Automaton: this cleanups all opened file descriptors. Required on PyPy for instance where the garbage collector behaves differently.

forcestop(wait: bool = True) None[源代码]
initial_states: List[_StateWrapper] = []
ioevents: Dict[str, List[_StateWrapper]] = {}
ionames: List[str] = []
iosupersockets: List[SuperSocket] = []
master_filter(pkt: Packet) bool[源代码]
my_send(pkt: Packet) None[源代码]
parse_args(debug: int = 0, store: int = 1, **kargs: Any) None[源代码]
recv_conditions: Dict[str, List[_StateWrapper]] = {}
reject_packet(wait: bool | None = False) Any[源代码]
remove_breakpoints(*bps: Any) None[源代码]
remove_interception_points(*ipts: Any) None[源代码]
restart(*args: Any, **kargs: Any) None[源代码]
run(resume: Message | None = None, wait: bool | None = True) Any[源代码]
runbg(resume: Message | None = None, wait: bool | None = False) None[源代码]
send(pkt: Packet) None[源代码]
start(*args: Any, **kargs: Any) None[源代码]
state: NewStateRequested = None
states: Dict[str, _StateWrapper] = {}
stop(wait: bool = True) None[源代码]
stop_states: List[_StateWrapper] = []
timeout: Dict[str, _TimerList] = {}
timer_by_name(name: str) Timer | None[源代码]
class scapy.automaton.Automaton_metaclass(name: str, bases: Tuple[Any], dct: Dict[str, Any])[源代码]

基类:type

build_graph() str[源代码]
graph(**kargs: Any) str | None[源代码]
class scapy.automaton.Message(**args: Any)[源代码]

基类:object

exc_info: Tuple[None, None, None] | Tuple[BaseException, Exception, TracebackType] = None
pkt: Packet = None
result: str = None
state: Message = None
type: str = None
class scapy.automaton.ObjectPipe(name: str | None = None)[源代码]

基类:Generic[_T]

clear() None[源代码]
close() None[源代码]
empty() bool[源代码]
fileno() int[源代码]
flush() None[源代码]
read(n: int | None = 0) _T | None[源代码]
recv(n: int | None = 0) _T | None[源代码]
static select(sockets: List[SuperSocket], remain: float | None = 0.05) List[SuperSocket][源代码]
send(obj: _T) int[源代码]
write(obj: _T) None[源代码]
class scapy.automaton.Timer(time: int | float, prio: int = 0, autoreload: bool = False)[源代码]

基类:object

get() float[源代码]
set(val: float) None[源代码]
scapy.automaton.select_objects(inputs: Iterable[Any], remain: float | int | None) List[Any][源代码]

Select objects. Same than: select.select(inputs, [], [], remain)

But also works on Windows, only on objects whose fileno() returns a Windows event. For simplicity, just use ObjectPipe() as a queue that you can select on whatever the platform is.

If you want an object to be always included in the output of select_objects (i.e. it's not selectable), just make fileno() return a strictly negative value.

示例

>>> a, b = ObjectPipe("a"), ObjectPipe("b")
>>> b.send("test")
>>> select_objects([a, b], 1)
[b]
参数:
  • inputs -- objects to process

  • remain -- timeout. If 0, poll.