uds.transport_interface.can_transport_interface.python_can
Implementation of UDS Transport Interface for CAN bus using python-can as bus manager.
Classes
Transport Interface for managing UDS on CAN with python-can package as bus handler. |
Module Contents
- class uds.transport_interface.can_transport_interface.python_can.PyCanTransportInterface(can_bus_manager, addressing_information, **kwargs)[source]
Bases:
uds.transport_interface.can_transport_interface.common.AbstractCanTransportInterface
Transport Interface for managing UDS on CAN with python-can package as bus handler.
Note
Documentation for python-can package: https://python-can.readthedocs.io/
Create python-can Transport Interface.
- Parameters:
can_bus_manager (can.BusABC) –
Python-can bus object for handling CAN.
Warning
Bus must have capability of receiving transmitted frames (
receive_own_messages=Trueset).addressing_information (uds.can.AbstractCanAddressingInformation) – Addressing Information of CAN Transport Interface.
kwargs (Any) –
Optional arguments that are specific for CAN bus.
- parameter n_as_timeout:
Timeout value for N_As time parameter.
- parameter n_ar_timeout:
Timeout value for N_Ar time parameter.
- parameter n_bs_timeout:
Timeout value for N_Bs time parameter.
- parameter n_br:
Value of N_Br time parameter to use in communication.
- parameter n_cs:
Value of N_Cs time parameter to use in communication.
- parameter n_cr_timeout:
Timeout value for N_Cr time parameter.
- parameter dlc:
Base CAN DLC value to use for CAN packets.
- parameter use_data_optimization:
Information whether to use CAN Frame Data Optimization.
- parameter filler_byte:
Filler byte value to use for CAN Frame Data Padding.
- parameter flow_control_parameters_generator:
Generator with Flow Control parameters to use.
- _MAX_LISTENER_TIMEOUT: float = 4280000.0
Maximal timeout value accepted by python-can listeners.
- _MIN_NOTIFIER_TIMEOUT: float = 0.001
Minimal timeout for notifiers that does not cause malfunctioning of listeners.
- __n_as_measured: uds.utilities.TimeMillisecondsAlias | None = None
- __n_ar_measured: uds.utilities.TimeMillisecondsAlias | None = None
- __frames_buffer
- __notifier: can.Notifier | None = None
- __async_frames_buffer
- __async_notifier: can.Notifier | None = None
- property n_as_measured: uds.utilities.TimeMillisecondsAlias | None
Get the last measured value of N_As time parameter.
Note
The last measurement comes from the last transmission of Single Frame or First Fame CAN Packet using either
send_packet()orasync_send_packet()method.- Returns:
Time in milliseconds or None if the value was never measured.
- Return type:
Optional[uds.utilities.TimeMillisecondsAlias]
- property n_ar_measured: uds.utilities.TimeMillisecondsAlias | None
Get the last measured value of N_Ar time parameter.
Note
The last measurement comes from the last transmission of Flow Control CAN Packet using either
send_packet()orasync_send_packet()method.- Returns:
Time in milliseconds or None if the value was never measured.
- Return type:
Optional[uds.utilities.TimeMillisecondsAlias]
- __teardown_notifier(suppress_warning=False)
Stop and remove CAN frame notifier for synchronous communication.
- Parameters:
suppress_warning (bool) – Do not warn about mixing Synchronous and Asynchronous implementation.
- Return type:
None
- __teardown_async_notifier(suppress_warning=False)
Stop and remove CAN frame notifier for asynchronous communication.
- Parameters:
suppress_warning (bool) – Do not warn about mixing Synchronous and Asynchronous implementation.
- Return type:
None
- __setup_notifier()
Configure CAN frame notifier for synchronous communication.
- Return type:
None
- __setup_async_notifier(loop)
Configure CAN frame notifier for asynchronous communication.
- Parameters:
loop (asyncio.AbstractEventLoop) – An
asyncioevent loop to use.- Return type:
None
- _send_cf_packets_block(cf_packets_block, delay)[source]
Send block of Consecutive Frame CAN packets.
- Parameters:
cf_packets_block (List[uds.packet.CanPacket]) – Consecutive Frame CAN packets to send.
delay (uds.utilities.TimeMillisecondsAlias) – Minimal delay between sending following Consecutive Frames [ms].
- Raises:
TransmissionInterruptionError – A new UDS message transmission was started while sending this message.
- Returns:
Records with historic information about transmitted Consecutive Frame CAN packets.
- Return type:
Tuple[uds.packet.CanPacketRecord, Ellipsis]
- async _async_send_cf_packets_block(cf_packets_block, delay, loop=None)[source]
Send block of Consecutive Frame CAN packets asynchronously.
- Parameters:
cf_packets_block (List[uds.packet.CanPacket]) – Consecutive Frame CAN packets to send.
delay (uds.utilities.TimeMillisecondsAlias) – Minimal delay between sending following Consecutive Frames [ms].
loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop to use for scheduling this task.
- Raises:
TransmissionInterruptionError – A new UDS message transmission was started while sending this message.
- Returns:
Records with historic information about transmitted Consecutive Frame CAN packets.
- Return type:
Tuple[uds.packet.CanPacketRecord, Ellipsis]
- _message_receive_start(initial_packet)[source]
Continue to receive message after receiving initial packet.
- Parameters:
initial_packet (uds.packet.CanPacketRecord) – Record of a packet initiating UDS message reception.
- Raises:
NotImplementedError – Unhandled CAN packet starting a new CAN message transmission was received.
- Returns:
Record of UDS message received.
- Return type:
uds.message.UdsMessageRecord
- async _async_message_receive_start(initial_packet, loop=None)[source]
Continue to receive message asynchronously after receiving initial packet.
- Parameters:
initial_packet (uds.packet.CanPacketRecord) – Record of a packet initiating UDS message reception.
loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop used for observing messages.
- Raises:
NotImplementedError – Unhandled CAN packet starting a new CAN message transmission was received.
- Returns:
Record of UDS message received.
- Return type:
uds.message.UdsMessageRecord
- _receive_cf_packets_block(sequence_number, block_size, remaining_data_length)[source]
Receive block of Consecutive Frames.
- Parameters:
sequence_number (int) – Current Sequence Number (next Consecutive Frame shall have this value set).
block_size (int) – Block Size value sent in the last Flow Control CAN packet.
remaining_data_length (int) – Number of remaining data bytes to receive in UDS message.
- Returns:
Either: - Record of UDS message if reception was interrupted by a new UDS message transmission. - Tuple with records of received Consecutive Frames.
- Return type:
Union[uds.message.UdsMessageRecord, Tuple[uds.packet.CanPacketRecord, Ellipsis]]
- async _async_receive_cf_packets_block(sequence_number, block_size, remaining_data_length, loop=None)[source]
Receive asynchronously block of Consecutive Frames.
- Parameters:
sequence_number (int) – Current Sequence Number (next Consecutive Frame shall have this value set).
block_size (int) – Block Size value sent in the last Flow Control CAN packet.
remaining_data_length (int) – Number of remaining data bytes to receive in UDS message.
loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop used for observing messages.
- Returns:
Either: - Record of UDS message if reception was interrupted by a new UDS message transmission. - Tuple with records of received Consecutive Frames.
- Return type:
Union[uds.message.UdsMessageRecord, Tuple[uds.packet.CanPacketRecord, Ellipsis]]
- _receive_consecutive_frames(first_frame)[source]
Receive Consecutive Frames after reception of First Frame.
- Parameters:
first_frame (uds.packet.CanPacketRecord) – First Frame that was received.
- Raises:
TimeoutError – N_Cr timeout was reached.
OverflowError – Flow Control packet with Flow Status equal to OVERFLOW was sent.
- Returns:
Record of UDS message that was formed provided First Frame and received Consecutive Frames.
- Return type:
uds.message.UdsMessageRecord
- async _async_receive_consecutive_frames(first_frame, loop=None)[source]
Receive asynchronously Consecutive Frames after reception of First Frame.
- Parameters:
first_frame (uds.packet.CanPacketRecord) – First Frame that was received.
loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop used for observing messages.
- Raises:
TimeoutError – N_Cr timeout was reached.
OverflowError – Flow Control packet with Flow Status equal to OVERFLOW was sent.
NotImplementedError – Unhandled CAN packet starting a new CAN message transmission was received.
- Returns:
Record of UDS message that was formed provided First Frame and received Consecutive Frames.
- Return type:
uds.message.UdsMessageRecord
- clear_frames_buffers()[source]
Clear buffers with transmitted and received frames.
Warning
This will cause that all CAN packets received in a past are no longer accessible.
- Return type:
None
- static is_supported_bus_manager(bus_manager)[source]
Check whether provided value is a bus manager that is supported by this Transport Interface.
- Parameters:
bus_manager (Any) – Value to check.
- Returns:
True if provided bus object is compatible with this Transport Interface, False otherwise.
- Return type:
bool
- send_packet(packet)[source]
Transmit CAN packet.
Warning
Must not be called within an asynchronous function.
- Parameters:
packet (uds.packet.CanPacket) – CAN packet to send.
- Raises:
TypeError – Provided packet is not CAN packet.
TimeoutError – Timeout was reached before a CAN packet could be transmitted.
- Returns:
Record with historic information about transmitted CAN packet.
- Return type:
uds.packet.CanPacketRecord
- async async_send_packet(packet, loop=None)[source]
Transmit asynchronously CAN packet.
- Parameters:
packet (uds.packet.CanPacket) – CAN packet to send.
loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop used for observing messages.
- Raises:
TypeError – Provided packet is not CAN packet.
TimeoutError – Timeout was reached before a CAN packet could be transmitted.
- Returns:
Record with historic information about transmitted CAN packet.
- Return type:
uds.packet.CanPacketRecord
- receive_packet(timeout=None)[source]
Receive CAN packet.
Warning
Must not be called within an asynchronous function.
- Parameters:
timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait.
- Raises:
TypeError – Provided timeout value is not None neither int nor float type.
ValueError – Provided timeout value is less or equal 0.
TimeoutError – Timeout was reached before a CAN packet was received.
- Returns:
Record with historic information about received CAN packet.
- Return type:
uds.packet.CanPacketRecord
- async async_receive_packet(timeout=None, loop=None)[source]
Receive asynchronously CAN packet.
- Parameters:
timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait.
loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop used for observing messages.
- Raises:
TypeError – Provided timeout value is not None neither int nor float type.
ValueError – Provided timeout value is less or equal 0.
TimeoutError – Timeout was reached before a CAN packet was received.
- Returns:
Record with historic information about received CAN packet.
- Return type:
uds.packet.CanPacketRecord
- send_message(message)[source]
Transmit UDS message over CAN.
Warning
Must not be called within an asynchronous function.
- Parameters:
message (uds.message.UdsMessage) – A message to send.
- Raises:
OverflowError – Flow Control packet with Flow Status equal to OVERFLOW was received.
TransmissionInterruptionError – A new UDS message transmission was started while sending this message.
NotImplementedError – Flow Control CAN packet with unknown Flow Status was received.
- Returns:
Record with historic information about transmitted UDS message.
- Return type:
uds.message.UdsMessageRecord
- async async_send_message(message, loop=None)[source]
Transmit asynchronously UDS message over CAN.
- Parameters:
message (uds.message.UdsMessage) – A message to send.
loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop to use for scheduling this task.
- Raises:
OverflowError – Flow Control packet with Flow Status equal to OVERFLOW was received.
TransmissionInterruptionError – A new UDS message transmission was started while sending this message.
NotImplementedError – Flow Control CAN packet with unknown Flow Status was received.
- Returns:
Record with historic information about transmitted UDS message.
- Return type:
uds.message.UdsMessageRecord
- receive_message(timeout=None)[source]
Receive UDS message over CAN.
Warning
Must not be called within an asynchronous function.
- Parameters:
timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait for UDS message transmission to start. This means that receiving might last longer if First Frame was received within provided time.
- Raises:
TypeError – Provided timeout value is not None neither int nor float type.
ValueError – Provided timeout value is less or equal 0.
TimeoutError – Timeout was reached. Either Single Frame / First Frame not received within [timeout] ms or N_As, N_Ar, N_Bs, N_Cr timeout reached.
- Returns:
Record with historic information about received UDS message.
- Return type:
uds.message.UdsMessageRecord
- async async_receive_message(timeout=None, loop=None)[source]
Receive asynchronously UDS message over CAN.
- Parameters:
timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait for UDS message transmission to start. This means that receiving might last longer if First Frame was received within provided time.
loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop to use for scheduling this task.
- Raises:
TypeError – Provided timeout value is not None neither int nor float type.
ValueError – Provided timeout value is less or equal 0.
TimeoutError – Timeout was reached. Either Single Frame / First Frame not received within [timeout] ms or N_As, N_Ar, N_Bs, N_Cr timeout reached.
- Returns:
Record with historic information about received UDS message.
- Return type:
uds.message.UdsMessageRecord