scapy.contrib.automotive.ecu

class scapy.contrib.automotive.ecu.Ecu(logging: bool = True, verbose: bool = True, store_supported_responses: bool = True, lookahead: int = 10)[源代码]

基类:object

An Ecu object can be used to
  • track the states of an Ecu.

  • to log all modification to an Ecu.

  • to extract supported responses of a real Ecu.

示例

>>> print("This ecu logs, tracks and creates supported responses")
>>> my_virtual_ecu = Ecu()
>>> my_virtual_ecu.update(PacketList([...]))
>>> my_virtual_ecu.supported_responses
>>> print("Another ecu just tracks")
>>> my_tracking_ecu = Ecu(logging=False, store_supported_responses=False)
>>> my_tracking_ecu.update(PacketList([...]))
>>> print("Another ecu just logs all modifications to it")
>>> my_logging_ecu = Ecu(verbose=False, store_supported_responses=False)
>>> my_logging_ecu.update(PacketList([...]))
>>> my_logging_ecu.log
>>> print("Another ecu just creates supported responses")
>>> my_response_ecu = Ecu(verbose=False, logging=False)
>>> my_response_ecu.update(PacketList([...]))
>>> my_response_ecu.supported_responses

Parameters to initialize an Ecu object

参数:
  • logging -- Turn logging on or off. Default is on.

  • verbose -- Turn tracking on or off. Default is on.

  • store_supported_responses -- Create a list of supported responses if True.

  • lookahead -- Configuration for lookahead when computing supported responses

static extend_pkt_with_logging(cls: Type[Packet]) Callable[[Callable[[Packet], Tuple[str, Any]]], None][源代码]

Decorator to add a function as 'get_log' method to a given class. This allows dynamic modifications and additions to a protocol. :param cls: A packet class to be modified :return: Decorator function

reset() None[源代码]

Resets the internal state to a default EcuState.

static sort_key_func(resp: EcuResponse) Tuple[bool, int, int, int][源代码]

This sorts responses in the following order: 1. Positive responses first 2. Lower ServiceIDs first 3. Less supported states first 4. Longer (more specific) responses first :param resp: EcuResponse to be sorted :return: Tuple as sort key

property supported_responses: List[EcuResponse]

Returns a sorted list of supported responses. The sort is done in a way to provide the best possible results, if this list of supported responses is used to simulate an real world Ecu with the EcuAnsweringMachine object. :return: A sorted list of EcuResponse objects

property unanswered_packets: PacketList

A list of all unanswered packets, which were processed by this Ecu object. :return: PacketList of unanswered packets

update(p: Packet | PacketList) None[源代码]

Processes a Packet or a list of Packets, according to the chosen configuration. :param p: Packet or list of Packets

class scapy.contrib.automotive.ecu.EcuAnsweringMachine(self, supported_responses=None, main_socket=None, broadcast_socket=None, basecls=<class 'scapy.packet.Raw'>, timeout=None, initial_ecu_state=None)[源代码]

基类:AnsweringMachine[PacketList]

AnsweringMachine which emulates the basic behaviour of a real world ECU. Provide a list of EcuResponse objects to configure the behaviour of a AnsweringMachine.

Usage:
>>> resp = EcuResponse(session=range(0,255), security_level=0, responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))
>>> sock = ISOTPSocket(can_iface, tx_id=0x700, rx_id=0x600, basecls=UDS)
>>> answering_machine = EcuAnsweringMachine(supported_responses=[resp], main_socket=sock, basecls=UDS)
>>> sim = threading.Thread(target=answering_machine, kwargs={'count': 4, 'timeout':5})
>>> sim.start()
function_name = 'EcuAnsweringMachine'
is_request(req: Packet) bool[源代码]
make_reply(req: Packet) PacketList[源代码]

Checks if a given request can be answered by the internal list of EcuResponses. First, it's evaluated if the internal EcuState of this AnsweringMachine is supported by an EcuResponse, next it's evaluated if a request answers the key_response of this EcuResponse object. The first fitting EcuResponse is used. If this EcuResponse modified the EcuState, the internal EcuState of this AnsweringMachine is updated, and the list of response Packets of the selected EcuResponse is returned. If no EcuResponse if found, a PacketList with a generic NegativeResponse is returned. :param req: A request packet :return: A list of response packets

optam0: Dict[str, Any]
optam1: Dict[str, Any]
optam2: Dict[str, Any]
optsend: Dict[str, Any]
optsniff: Dict[str, Any]
parse_options(supported_responses: ~typing.List[~scapy.contrib.automotive.ecu.EcuResponse] | None = None, main_socket: ~scapy.supersocket.SuperSocket | None = None, broadcast_socket: ~scapy.supersocket.SuperSocket | None = None, basecls: ~typing.Type[~scapy.packet.Packet] = <class 'scapy.packet.Raw'>, timeout: int | float | None = None, initial_ecu_state: ~scapy.contrib.automotive.ecu.EcuState | None = None) None[源代码]
参数:
  • supported_responses -- List of EcuResponse objects to define the behaviour. The default response is generalReject.

  • main_socket -- Defines the object of the socket to send and receive packets.

  • broadcast_socket -- Defines the object of the broadcast socket. Listen-only, responds with the main_socket. None to disable broadcast capabilities.

  • basecls -- Provide a basecls of the used protocol

  • timeout -- Specifies the timeout for sniffing in seconds.

reset_state() None[源代码]
send_reply(reply: PacketList, send_function: Any | None = None) None[源代码]

Sends all Packets of a EcuResponse object. This allows to send multiple packets up on a request. If the list contains more than one packet, a random time between each packet is waited until the next packet will be sent. :param reply: List of packets to be sent.

sniff_options_list = ['store', 'opened_socket', 'count', 'filter', 'prn', 'stop_filter', 'timeout']
property state: EcuState
class scapy.contrib.automotive.ecu.EcuResponse(state: ~scapy.contrib.automotive.ecu.EcuState | ~typing.Iterable[~scapy.contrib.automotive.ecu.EcuState] | None = None, responses: ~typing.Iterable[~scapy.packet.Packet] | ~scapy.plist.PacketList | ~scapy.packet.Packet = <Raw  load=b'\x7f\x10' |>, answers: ~typing.Callable[[~scapy.packet.Packet, ~scapy.packet.Packet], bool] | None = None)[源代码]

基类:object

Encapsulates responses and the according EcuStates. A list of this objects can be used to configure an EcuAnsweringMachine. This is useful, if you want to clone the behaviour of a real Ecu.

示例

>>> EcuResponse(EcuState(session=2, security_level=2), responses=UDS()/UDS_RDBIPR(dataIdentifier=2)/Raw(b"deadbeef1"))
>>> EcuResponse([EcuState(session=range(2, 5), security_level=2), EcuState(session=3, security_level=5)], responses=UDS()/UDS_RDBIPR(dataIdentifier=9)/Raw(b"deadbeef4"))

Initialize an EcuResponse capsule

参数:
  • state -- EcuState or list of EcuStates in which this response is allowed to be sent. If no state provided, the response packet will always be send.

  • responses -- A Packet or a list of Packet objects. By default the last packet is asked if it answers an incoming packet. This allows to send for example requestCorrectlyReceived-ResponsePending packets.

  • answers -- Optional argument to provide a custom answer here: lambda resp, req: return resp.answers(req) This allows the modification of a response depending on a request. Custom SecurityAccess mechanisms can be implemented in this way or generic NegativeResponse messages which answers to everything can be realized in this way.

answers(other: Packet) int | bool[源代码]
command() str[源代码]
property key_response: Packet
property responses: PacketList
property states: List[EcuState] | None
supports_state(state: EcuState) bool[源代码]
class scapy.contrib.automotive.ecu.EcuSession(*args: Any, **kwargs: Any)[源代码]

基类:DefaultSession

Tracks modification to an Ecu object 'on-the-flow'.

The parameters for the internal Ecu object are obtained from the kwargs dict.

logging: Turn logging on or off. Default is on. verbose: Turn tracking on or off. Default is on. store_supported_responses: Create a list of supported responses, if True.

示例

>>> sniff(session=EcuSession)
on_packet_received(pkt: Packet | None) None[源代码]
class scapy.contrib.automotive.ecu.EcuState(**kwargs: Any)[源代码]

基类:object

Stores the state of an Ecu. The state is defined by a protocol, for example UDS or GMLAN. A EcuState supports comparison and serialization (command()).

command() str[源代码]
static extend_pkt_with_modifier(cls: Type[Packet]) Callable[[Callable[[Packet, Packet, EcuState], None]], None][源代码]

Decorator to add a function as 'modify_ecu_state' method to a given class. This allows dynamic modifications and additions to a protocol. :param cls: A packet class to be modified :return: Decorator function

static get_modified_ecu_state(response: Packet, request: Packet, state: EcuState, modify_in_place: bool = False) EcuState[源代码]

Helper function to get a modified EcuState from a Packet and a previous EcuState. An EcuState is always modified after a response Packet is received. In some protocols, the belonging request packet is necessary to determine the precise state of the Ecu

参数:
  • response -- Response packet that supports modify_ecu_state

  • request -- Belonging request of the response that modifies Ecu

  • state -- The previous/current EcuState

  • modify_in_place -- If True, the given EcuState will be modified

返回:

The modified EcuState or a modified copy

static is_modifier_pkt(pkt: Packet) bool[源代码]

Helper function to determine if a Packet contains a layer that modifies the EcuState. :param pkt: Packet to be analyzed :return: True if pkt contains layer that implements modify_ecu_state

reset() None[源代码]