Python-CAN
Examples using the python-can package to control the CAN bus, including transmission and reception of CAN frames.
Kvaser interface
Examples demonstrating the use of Kvaser CAN interfaces as hardware devices for sending and receiving CAN messages.
Synchronous implementation
Message handling
Send UDS message:
"""Send a message using Diagnostic on CAN protocol (ISO 15765)."""
from can import Bus
from uds.addressing import AddressingType
from uds.can import CanAddressingFormat, CanAddressingInformation, CanVersion, PythonCanTransportInterface
from uds.message import UdsMessage
def main():
# configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
can_interface = Bus(
# provide configuration for your CAN interface
interface="kvaser", # replace with your CAN interface name
channel=0,
receive_own_messages=True, # mandatory setting if you use Kvaser
# configure your CAN bus
bitrate=500_000,
fd=False)
# configure addresses for Diagnostics on CAN communication
# CAN Addressing Formats explanation:
# https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
addressing_information = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
rx_physical_params={"can_id": 0x611},
tx_physical_params={"can_id": 0x612},
rx_functional_params={"can_id": 0x6FF},
tx_functional_params={"can_id": 0x6FE})
# create Transport Interface object for Diagnostics on CAN communication
can_ti = PythonCanTransportInterface(
network_manager=can_interface,
addressing_information=addressing_information,
can_version=CanVersion.CLASSIC_CAN) # send all diagnostic packets as Classic CAN frames
# define UDS Messages to send
message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])
# send UDS message
sent_message_record = can_ti.send_message(message=message)
# show sent message
print(sent_message_record)
# close connections with CAN interface
del can_ti
can_interface.shutdown()
if __name__ == "__main__":
main()
Receive UDS message:
"""Receive a message using Diagnostic on CAN protocol (ISO 15765)."""
from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation, PythonCanTransportInterface
def main():
# configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
can_interface = Bus(
# provide configuration for your CAN interface
interface="kvaser", # replace with your CAN interface name
channel=0,
receive_own_messages=True, # mandatory setting if you use Kvaser
# configure your CAN bus
bitrate=500_000,
fd=True,
data_bitrate=4_000_000)
# configure addresses for Diagnostics on CAN communication
# CAN Addressing Formats explanation:
# https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
addressing_information = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
rx_physical_params={"can_id": 0x611},
tx_physical_params={"can_id": 0x612},
rx_functional_params={"can_id": 0x6FF},
tx_functional_params={"can_id": 0x6FE})
# create Transport Interface object for Diagnostics on CAN communication
can_ti = PythonCanTransportInterface(
network_manager=can_interface,
addressing_information=addressing_information)
# receive UDS message
received_message_record = can_ti.receive_message(start_timeout=1000) # timeout=1000 [ms]
# show received message
print(received_message_record)
# close connections with CAN interface
del can_ti
can_interface.shutdown()
if __name__ == "__main__":
main()
Send UDS message on one interface, receive on the other:
"""Send (on one interface) and received (on the second) a message using Diagnostic on CAN protocol (ISO 15765)."""
from threading import Timer
from time import sleep
from can import Bus
from uds.addressing import AddressingType
from uds.can import CanAddressingFormat, CanAddressingInformation, CanVersion, PythonCanTransportInterface
from uds.message import UdsMessage
def main():
# configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
can_interface_1 = Bus(
# provide configuration for your CAN interface
interface="kvaser", # replace with your CAN interface name
channel=0,
receive_own_messages=True, # mandatory setting if you use Kvaser
# configure your CAN bus
bitrate=500_000,
fd=True,
data_bitrate=4_000_000)
# configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
can_interface_2 = Bus(
# provide configuration for your CAN interface
interface="kvaser", # replace with your CAN interface name
channel=1,
receive_own_messages=True, # mandatory setting if you use Kvaser
# configure your CAN bus
bitrate=500_000,
fd=True,
data_bitrate=4_000_000)
# configure addresses for Diagnostics on CAN communication
# CAN Addressing Formats explanation:
# https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
ai_send = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
tx_physical_params={"can_id": 0x611},
rx_physical_params={"can_id": 0x612},
tx_functional_params={"can_id": 0x6FF},
rx_functional_params={"can_id": 0x6FE})
ai_receive = ai_send.get_other_end()
# create Transport Interface object for Diagnostics on CAN communication
can_ti_1 = PythonCanTransportInterface(
network_manager=can_interface_1,
addressing_information=ai_send,
can_version=CanVersion.CAN_FD) # send all diagnostic packets as CAN FD frames
can_ti_2 = PythonCanTransportInterface(
network_manager=can_interface_2,
addressing_information=ai_receive)
# define UDS Message to send
message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x62, 0x10, 0x00, *range(100)])
# prepare code for scheduling transmission
sent_message_record = None
def _send_message():
nonlocal sent_message_record
sent_message_record = can_ti_1.send_message(message)
timer = Timer(interval=0.01, # delay after which message will be sent, now 0.01 [s]
function=_send_message)
# send and receive message
timer.start()
received_message_record = can_ti_2.receive_message(start_timeout=1000) # timeout=1000 [ms]
# wait till message is received
while not timer.finished.is_set():
sleep(0.01)
# show results
print(sent_message_record)
print(received_message_record)
# close connections with CAN interfaces
del can_ti_1
del can_ti_2
can_interface_1.shutdown()
can_interface_2.shutdown()
if __name__ == "__main__":
main()
Packet handling
Send CAN packets:
"""Send a packet defined by Diagnostic on CAN protocol (ISO 15765)."""
from can import Bus
from uds.addressing import AddressingType
from uds.can import CanAddressingFormat, CanAddressingInformation, CanVersion, PythonCanTransportInterface
from uds.message import UdsMessage
def main():
# configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
can_interface = Bus(
# provide configuration for your CAN interface
interface="kvaser", # replace with your CAN interface name
channel=0,
receive_own_messages=True, # mandatory setting if you use Kvaser
# configure your CAN bus
bitrate=500_000,
fd=True,
data_bitrate=4_000_000)
# configure addresses for Diagnostics on CAN communication
# CAN Addressing Formats explanation:
# https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
addressing_information = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
rx_physical_params={"can_id": 0x611},
tx_physical_params={"can_id": 0x612},
rx_functional_params={"can_id": 0x6FF},
tx_functional_params={"can_id": 0x6FE})
# create Transport Interface object for Diagnostics on CAN communication
can_ti = PythonCanTransportInterface(
network_manager=can_interface,
addressing_information=addressing_information,
can_version=CanVersion.CLASSIC_CAN) # send all diagnostic packets as Classic CAN frames
# define UDS Message
message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])
# pick one CAN packet from UDS Message segmentation
packet = can_ti.segmenter.segmentation(message)[0]
# send CAN Packet
sent_packet_record = can_ti.send_packet(packet)
# show sent packet
print(sent_packet_record)
# close connections with CAN interface
del can_ti
can_interface.shutdown()
if __name__ == "__main__":
main()
Receive CAN packets:
"""Receive a packet defined by Diagnostic on CAN protocol (ISO 15765)."""
from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation, PythonCanTransportInterface
def main():
# configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
can_interface = Bus(
# provide configuration for your CAN interface
interface="kvaser", # replace with your CAN interface name
channel=0,
receive_own_messages=True, # mandatory setting if you use Kvaser
# configure your CAN bus
bitrate=500_000,
fd=True,
data_bitrate=4_000_000)
# configure addresses for Diagnostics on CAN communication
# CAN Addressing Formats explanation:
# https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
addressing_information = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
rx_physical_params={"can_id": 0x611},
tx_physical_params={"can_id": 0x612},
rx_functional_params={"can_id": 0x6FF},
tx_functional_params={"can_id": 0x6FE})
# create Transport Interface object for Diagnostics on CAN communication
can_ti = PythonCanTransportInterface(
network_manager=can_interface,
addressing_information=addressing_information)
# receive CAN packet
received_packet_record = can_ti.receive_packet(timeout=1000) # timeout=1000 [ms]
# show received packet
print(received_packet_record)
# close connections with CAN interface
del can_ti
can_interface.shutdown()
if __name__ == "__main__":
main()
Send CAN packets on one interface, receive on the other:
"""Send (on one interface) and received (on the second) a packet defined by Diagnostic on CAN protocol (ISO 15765)."""
from can import Bus
from uds.addressing import AddressingType
from uds.can import CanAddressingFormat, CanAddressingInformation, CanVersion, PythonCanTransportInterface
from uds.message import UdsMessage
def main():
# configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
can_interface_1 = Bus(
# provide configuration for your CAN interface
interface="kvaser", # replace with your CAN interface name
channel=0,
receive_own_messages=True, # mandatory setting if you use Kvaser
# configure your CAN bus
bitrate=500_000,
fd=True,
data_bitrate=4_000_000)
# configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
can_interface_2 = Bus(
# provide configuration for your CAN interface
interface="kvaser", # replace with your CAN interface name
channel=1,
receive_own_messages=True, # mandatory setting if you use Kvaser
# configure your CAN bus
bitrate=500_000,
fd=True,
data_bitrate=4_000_000)
# configure addresses for Diagnostics on CAN communication
# CAN Addressing Formats explanation:
# https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
ai_send = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
tx_physical_params={"can_id": 0x611},
rx_physical_params={"can_id": 0x612},
tx_functional_params={"can_id": 0x6FF},
rx_functional_params={"can_id": 0x6FE})
ai_receive = ai_send.get_other_end()
# create Transport Interface object for Diagnostics on CAN communication
can_ti_1 = PythonCanTransportInterface(
network_manager=can_interface_1,
addressing_information=ai_send,
can_version=CanVersion.CAN_FD) # send all diagnostic packets as CAN FD frames
can_ti_2 = PythonCanTransportInterface(
network_manager=can_interface_2,
addressing_information=ai_receive)
# define UDS Message
message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])
# pick one CAN packet from UDS Message segmentation
packet = can_ti_1.segmenter.segmentation(message)[0]
# send and receive packet CAN packet
sent_packet_record = can_ti_1.send_packet(packet)
received_packet_record = can_ti_2.receive_packet(timeout=100)
# show results
print(sent_packet_record)
print(received_packet_record)
# close connections with CAN interfaces
del can_ti_1
del can_ti_2
can_interface_1.shutdown()
can_interface_2.shutdown()
if __name__ == "__main__":
main()
Asynchronous implementation
Message handling
Send UDS message:
"""Send asynchronously a message using Diagnostic on CAN protocol (ISO 15765)."""
import asyncio
from can import Bus
from uds.addressing import AddressingType
from uds.can import CanAddressingFormat, CanAddressingInformation, CanVersion, PythonCanTransportInterface
from uds.message import UdsMessage
async def main():
# configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
can_interface = Bus(
# provide configuration for your CAN interface
interface="kvaser", # replace with your CAN interface name
channel=0,
receive_own_messages=True, # mandatory setting if you use Kvaser
# configure your CAN bus
bitrate=500_000,
fd=False)
# configure addresses for Diagnostics on CAN communication
# CAN Addressing Formats explanation:
# https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
addressing_information = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
rx_physical_params={"can_id": 0x611},
tx_physical_params={"can_id": 0x612},
rx_functional_params={"can_id": 0x6FF},
tx_functional_params={"can_id": 0x6FE})
# create Transport Interface object for Diagnostics on CAN communication
can_ti = PythonCanTransportInterface(
network_manager=can_interface,
addressing_information=addressing_information,
can_version=CanVersion.CLASSIC_CAN) # send all diagnostic packets as Classic CAN frames
# define UDS Messages to send
message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])
# send UDS Message
sent_message_record = await can_ti.async_send_message(message)
# show sent message
print(sent_message_record)
# close connections with CAN interface
del can_ti
can_interface.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Receive UDS message:
"""Receive asynchronously a message using Diagnostic on CAN protocol (ISO 15765)."""
import asyncio
from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation, PythonCanTransportInterface
async def main():
# configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
can_interface = Bus(
# provide configuration for your CAN interface
interface="kvaser", # replace with your CAN interface name
channel=0,
receive_own_messages=True, # mandatory setting if you use Kvaser
# configure your CAN bus
bitrate=500_000,
fd=True,
data_bitrate=4_000_000)
# configure addresses for Diagnostics on CAN communication
# CAN Addressing Formats explanation:
# https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
addressing_information = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
rx_physical_params={"can_id": 0x611},
tx_physical_params={"can_id": 0x612},
rx_functional_params={"can_id": 0x6FF},
tx_functional_params={"can_id": 0x6FE})
# create Transport Interface object for Diagnostics on CAN communication
can_ti = PythonCanTransportInterface(
network_manager=can_interface,
addressing_information=addressing_information)
# receive UDS message
received_message_record = await can_ti.async_receive_message(start_timeout=1000) # timeout=1000 [ms]
# show received message
print(received_message_record)
# close connections with CAN interface
del can_ti
can_interface.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Send UDS message on one interface, receive on the other:
"""Send (on one interface) and received (on the second) asynchronously a message using Diagnostic on CAN protocol (ISO 15765)."""
import asyncio
from can import Bus
from uds.addressing import AddressingType
from uds.can import CanAddressingFormat, CanAddressingInformation, CanVersion, PythonCanTransportInterface
from uds.message import UdsMessage
async def main():
# configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
can_interface_1 = Bus(
# provide configuration for your CAN interface
interface="kvaser", # replace with your CAN interface name
channel=0,
receive_own_messages=True, # mandatory setting if you use Kvaser
# configure your CAN bus
bitrate=500_000,
fd=True,
data_bitrate=4_000_000)
# configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
can_interface_2 = Bus(
# provide configuration for your CAN interface
interface="kvaser", # replace with your CAN interface name
channel=1,
receive_own_messages=True, # mandatory setting if you use Kvaser
# configure your CAN bus
bitrate=500_000,
fd=True,
data_bitrate=4_000_000)
# configure addresses for Diagnostics on CAN communication
# CAN Addressing Formats explanation:
# https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
ai_send = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
tx_physical_params={"can_id": 0x611},
rx_physical_params={"can_id": 0x612},
tx_functional_params={"can_id": 0x6FF},
rx_functional_params={"can_id": 0x6FE})
ai_receive = ai_send.get_other_end()
# create Transport Interface object for Diagnostics on CAN communication
can_ti_1 = PythonCanTransportInterface(
network_manager=can_interface_1,
addressing_information=ai_send,
can_version=CanVersion.CAN_FD) # send all diagnostic packets as CAN FD frames
can_ti_2 = PythonCanTransportInterface(
network_manager=can_interface_2,
addressing_information=ai_receive)
# define UDS Message to send
message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x62, 0x10, 0x00, *range(100)])
# send and receive message
receive_message_task = asyncio.create_task(can_ti_2.async_receive_message(start_timeout=1000)) # timeout=1000 ms
sent_message_record = await can_ti_1.async_send_message(message)
received_message_record = await receive_message_task
# show results
print(sent_message_record)
print(received_message_record)
# close connections with CAN interfaces
del can_ti_1
del can_ti_2
can_interface_1.shutdown()
can_interface_2.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Packet handling
Send CAN packets:
"""Send asynchronously a packet defined by Diagnostic on CAN protocol (ISO 15765)."""
import asyncio
from can import Bus
from uds.addressing import AddressingType
from uds.can import CanAddressingFormat, CanAddressingInformation, CanVersion, PythonCanTransportInterface
from uds.message import UdsMessage
async def main():
# configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
can_interface = Bus(
# provide configuration for your CAN interface
interface="kvaser", # replace with your CAN interface name
channel=0,
receive_own_messages=True, # mandatory setting if you use Kvaser
# configure your CAN bus
bitrate=500_000,
fd=True,
data_bitrate=4_000_000)
# configure addresses for Diagnostics on CAN communication
# CAN Addressing Formats explanation:
# https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
addressing_information = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
rx_physical_params={"can_id": 0x611},
tx_physical_params={"can_id": 0x612},
rx_functional_params={"can_id": 0x6FF},
tx_functional_params={"can_id": 0x6FE})
# create Transport Interface object for Diagnostics on CAN communication
can_ti = PythonCanTransportInterface(
network_manager=can_interface,
addressing_information=addressing_information,
can_version=CanVersion.CLASSIC_CAN) # send all diagnostic packets as Classic CAN frames
# define UDS Message
message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])
# pick one CAN packet from UDS Message segmentation
packet = can_ti.segmenter.segmentation(message)[0]
# send CAN Packet
sent_packet_record = await can_ti.async_send_packet(packet)
# show sent packet
print(sent_packet_record)
# close connections with CAN interface
del can_ti
can_interface.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Receive CAN packets:
"""Receive asynchronously a packet defined by Diagnostic on CAN protocol (ISO 15765)."""
import asyncio
from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation, PythonCanTransportInterface
async def main():
# configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
can_interface = Bus(
# provide configuration for your CAN interface
interface="kvaser", # replace with your CAN interface name
channel=0,
receive_own_messages=True, # mandatory setting if you use Kvaser
# configure your CAN bus
bitrate=500_000,
fd=True,
data_bitrate=4_000_000)
# configure addresses for Diagnostics on CAN communication
# CAN Addressing Formats explanation:
# https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
addressing_information = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
rx_physical_params={"can_id": 0x611},
tx_physical_params={"can_id": 0x612},
rx_functional_params={"can_id": 0x6FF},
tx_functional_params={"can_id": 0x6FE})
# create Transport Interface object for Diagnostics on CAN communication
can_ti = PythonCanTransportInterface(
network_manager=can_interface,
addressing_information=addressing_information)
# receive CAN packet
received_packet_record = await can_ti.async_receive_packet(timeout=1000) # timeout=1000 [ms]
# show received packet
print(received_packet_record)
# close connections with CAN interface
del can_ti
can_interface.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Send CAN packets on one interface, receive on the other:
"""Send (on one interface) and received (on the second) asynchronously a packet defined by Diagnostic on CAN protocol (ISO 15765)."""
import asyncio
from can import Bus
from uds.addressing import AddressingType
from uds.can import CanAddressingFormat, CanAddressingInformation, CanVersion, PythonCanTransportInterface
from uds.message import UdsMessage
async def main():
# configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
can_interface_1 = Bus(
# provide configuration for your CAN interface
interface="kvaser", # replace with your CAN interface name
channel=0,
receive_own_messages=True, # mandatory setting if you use Kvaser
# configure your CAN bus
bitrate=500_000,
fd=True,
data_bitrate=4_000_000)
# configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
can_interface_2 = Bus(
# provide configuration for your CAN interface
interface="kvaser", # replace with your CAN interface name
channel=1,
receive_own_messages=True, # mandatory setting if you use Kvaser
# configure your CAN bus
bitrate=500_000,
fd=True,
data_bitrate=4_000_000)
# configure addresses for Diagnostics on CAN communication
# CAN Addressing Formats explanation:
# https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
ai_send = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
tx_physical_params={"can_id": 0x611},
rx_physical_params={"can_id": 0x612},
tx_functional_params={"can_id": 0x6FF},
rx_functional_params={"can_id": 0x6FE})
ai_receive = ai_send.get_other_end()
# create Transport Interface object for Diagnostics on CAN communication
can_ti_1 = PythonCanTransportInterface(
network_manager=can_interface_1,
addressing_information=ai_send,
can_version=CanVersion.CAN_FD) # send all diagnostic packets as CAN FD frames
can_ti_2 = PythonCanTransportInterface(
network_manager=can_interface_2,
addressing_information=ai_receive)
# define UDS Message
message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])
# pick one CAN packet from UDS Message segmentation
packet = can_ti_1.segmenter.segmentation(message)[0]
# send and receive packet CAN packet
receive_packet_task = asyncio.create_task(can_ti_2.async_receive_packet(timeout=100)) # timeout=100 ms
sent_packet_record = await can_ti_1.async_send_packet(packet)
received_packet_record = await receive_packet_task
# show results
print(sent_packet_record)
print(received_packet_record)
# close connections with CAN interfaces
del can_ti_1
del can_ti_2
can_interface_1.shutdown()
can_interface_2.shutdown()
if __name__ == "__main__":
asyncio.run(main())