Python-CAN
Examples with python-can package being used for controlling CAN bus (handling CAN frames transmission and reception).
Kvaser interface
Examples with Kvaser CAN interfaces used as a hardware device to send and receive CAN messages.
Synchronous implementation
Message handling
Send UDS message:
from pprint import pprint
from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.message import UdsMessage
from uds.transmission_attributes import AddressingType
from uds.transport_interface import PyCanTransportInterface
def main():
# configure CAN interfaces
kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True)
# second interface is only used to acknowledge CAN frames sent by `kvaser_interface_1`,
# you might comment it out if you have another device to do that
kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True)
# configure Addressing Information of a CAN Node (example values)
addressing_information = CanAddressingInformation(
addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
tx_physical={"can_id": 0x611},
rx_physical={"can_id": 0x612},
tx_functional={"can_id": 0x6FF},
rx_functional={"can_id": 0x6FE})
# create Transport Interface object for UDS communication
can_ti = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
addressing_information=addressing_information)
# define UDS Messages to send
message = UdsMessage(addressing_type=AddressingType.FUNCTIONAL, payload=[0x10, 0x03])
# send UDS Message
message_record = can_ti.send_message(message)
pprint(message_record.__dict__)
# close connections with CAN interfaces
del can_ti
kvaser_interface_1.shutdown()
kvaser_interface_2.shutdown()
if __name__ == "__main__":
main()
Receive UDS message:
from pprint import pprint
from threading import Timer
from can import Bus, Message
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.transport_interface import PyCanTransportInterface
def main():
# configure CAN interfaces
kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True) # receiving
kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True) # sending
# configure Addressing Information of a CAN Node (example values)
addressing_information = CanAddressingInformation(
addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
tx_physical={"can_id": 0x611},
rx_physical={"can_id": 0x612},
tx_functional={"can_id": 0x6FF},
rx_functional={"can_id": 0x6FE})
# create Transport Interface object for UDS communication
can_ti = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
addressing_information=addressing_information)
# define frames carrying a UDS message (to be received later on)
frame_1 = Message(arbitration_id=0x612, data=[0x10, 0x0b, 0x62, 0x10, 0x00, 0x00, 0x01, 0x02])
frame_2 = Message(arbitration_id=0x612, data=[0x21, 0x03, 0x04, 0x05, 0x06, 0x07, 0xCC, 0xCC])
# schedule UDS message transmission carried by frame_1 and frame_2
Timer(interval=0.01, function=kvaser_interface_2.send, args=(frame_1, )).start()
Timer(interval=0.5, function=kvaser_interface_2.send, args=(frame_2, )).start()
# receive message
received_message_record = can_ti.receive_message(timeout=1000) # 1000 [ms]
pprint(received_message_record.__dict__)
# close connections with CAN interfaces
del can_ti
kvaser_interface_1.shutdown()
kvaser_interface_2.shutdown()
if __name__ == "__main__":
main()
Send UDS message on one interface, receive on the other:
from pprint import pprint
from threading import Timer
from time import sleep
from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.message import UdsMessage
from uds.transmission_attributes import AddressingType
from uds.transport_interface import PyCanTransportInterface
def main():
# configure CAN interfaces
kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True) # receiving
kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True) # sending
# configure Addressing Information of a CAN Nodes (example values)
ai_receive = CanAddressingInformation(
addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
tx_physical={"can_id": 0x611},
rx_physical={"can_id": 0x612},
tx_functional={"can_id": 0x6FF},
rx_functional={"can_id": 0x6FE})
ai_send = CanAddressingInformation(
addressing_format=ai_receive.addressing_format,
tx_physical={"can_id": 0x612},
rx_physical={"can_id": 0x611},
tx_functional={"can_id": 0x6FE},
rx_functional={"can_id": 0x6FF})
# create Transport Interface objects for UDS communication
can_ti_1 = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
addressing_information=ai_receive)
can_ti_2 = PyCanTransportInterface(can_bus_manager=kvaser_interface_2,
addressing_information=ai_send)
# define UDS Messages to send
message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x62, 0x10, 0x00, *range(100)])
# prepare code for scheduling transmission
sent_message_record = None
def _send_message():
nonlocal sent_message_record
sent_message_record = can_ti_2.send_message(message)
timer = Timer(interval=0.01, # delay after which message will be sent, now 0.01 [s]
function=_send_message)
# send and receive message
timer.start()
received_message_record = can_ti_1.receive_message(timeout=1000) # 1000 [ms]
# wait till message is received
while not timer.finished.is_set():
sleep(0.01)
# show results
pprint(received_message_record.__dict__)
pprint(sent_message_record.__dict__)
# close connections with CAN interfaces
del can_ti_1
del can_ti_2
kvaser_interface_1.shutdown()
kvaser_interface_2.shutdown()
if __name__ == "__main__":
main()
Packet handling
Send CAN packets:
from pprint import pprint
from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.message import UdsMessage
from uds.transmission_attributes import AddressingType
from uds.transport_interface import PyCanTransportInterface
def main():
# configure CAN interfaces
kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True)
# second interface is only used to acknowledge CAN frames sent by `kvaser_interface_1`,
# you might comment it out if you have another device to do that
kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True)
# configure Addressing Information of a CAN Node (example values set)
addressing_information = CanAddressingInformation(
addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
tx_physical={"can_id": 0x611},
rx_physical={"can_id": 0x612},
tx_functional={"can_id": 0x6FF},
rx_functional={"can_id": 0x6FE})
# create Transport Interface object for UDS communication
can_ti = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
addressing_information=addressing_information)
# define UDS Messages to send
message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])
# create CAN packets that carries those UDS Messages
packet = can_ti.segmenter.segmentation(message)[0]
# send CAN Packet
packet_record = can_ti.send_packet(packet)
pprint(packet_record.__dict__)
# close connections with CAN interfaces
del can_ti
kvaser_interface_1.shutdown()
kvaser_interface_2.shutdown()
if __name__ == "__main__":
main()
Receive CAN packets:
from pprint import pprint
from threading import Timer
from can import Bus, Message
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.transport_interface import PyCanTransportInterface
def main():
# configure CAN interfaces
kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True) # receiving
kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True) # sending
# configure Addressing Information of a CAN Node (example values)
addressing_information = CanAddressingInformation(
addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
tx_physical={"can_id": 0x611},
rx_physical={"can_id": 0x612},
tx_functional={"can_id": 0x6FF},
rx_functional={"can_id": 0x6FE})
# create Transport Interface object for UDS communication
can_ti = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
addressing_information=addressing_information)
# some frames to be received later on
frame_1 = Message(arbitration_id=0x6FE, data=[0x02, 0x10, 0x03])
frame_2 = Message(arbitration_id=0x611, data=[0x02, 0x10, 0x03]) # shall be ignored, as it is not observed CAN ID
frame_3 = Message(arbitration_id=0x612, data=[0x02, 0x3E, 0x00, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA])
# receive CAN packet 1
Timer(interval=0.1, function=kvaser_interface_1.send, args=(frame_1,)).start() # schedule transmission of frame 1
record_1 = can_ti.receive_packet(timeout=1000) # receive CAN packet 1 carried by frame 1
pprint(record_1.__dict__) # show attributes of CAN packet record 1
# receive CAN packet 2
Timer(interval=0.1, function=kvaser_interface_1.send, args=(frame_2,)).start() # schedule transmission of frame 2
Timer(interval=0.2, function=kvaser_interface_1.send, args=(frame_3,)).start() # schedule transmission of frame 3
record_2 = can_ti.receive_packet(timeout=1000) # receive CAN packet 2 carried by frame 3
pprint(record_2.__dict__) # show attributes of CAN packet record 2
# close connections with CAN interfaces
del can_ti
kvaser_interface_1.shutdown()
kvaser_interface_2.shutdown()
if __name__ == "__main__":
main()
Send CAN packets on one interface, receive on the other:
from pprint import pprint
from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.message import UdsMessage
from uds.transmission_attributes import AddressingType
from uds.transport_interface import PyCanTransportInterface
def main():
# configure CAN interfaces
kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True) # receiving
kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True) # sending
# configure Addressing Information of a CAN Nodes (example values)
ai_receive = CanAddressingInformation(
addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
tx_physical={"can_id": 0x611},
rx_physical={"can_id": 0x612},
tx_functional={"can_id": 0x6FF},
rx_functional={"can_id": 0x6FE})
ai_send = CanAddressingInformation(
addressing_format=ai_receive.addressing_format,
tx_physical={"can_id": 0x612},
rx_physical={"can_id": 0x611},
tx_functional={"can_id": 0x6FE},
rx_functional={"can_id": 0x6FF})
# create Transport Interface objects for UDS communication
can_ti_1 = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
addressing_information=ai_receive)
can_ti_2 = PyCanTransportInterface(can_bus_manager=kvaser_interface_2,
addressing_information=ai_send)
# define UDS Messages to send
message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])
# create CAN packets that carries those UDS Messages
packet = can_ti_2.segmenter.segmentation(message)[0]
# send and receive packet CAN packet
sent_packet_record = can_ti_2.send_packet(packet)
pprint(sent_packet_record.__dict__)
received_packet_record = can_ti_1.receive_packet(timeout=100)
pprint(received_packet_record.__dict__)
# close connections with CAN interfaces
del can_ti_1
del can_ti_2
kvaser_interface_1.shutdown()
kvaser_interface_2.shutdown()
if __name__ == "__main__":
main()
Asynchronous implementation
Message handling
Send UDS message:
import asyncio
from pprint import pprint
from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.message import UdsMessage
from uds.transmission_attributes import AddressingType
from uds.transport_interface import PyCanTransportInterface
async def main():
# configure CAN interfaces
kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True)
# second interface is only used to acknowledge CAN frames sent by `kvaser_interface_1`,
# you might comment it out if you have another device to do that
kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True)
# configure Addressing Information of a CAN Node (example values set)
addressing_information = CanAddressingInformation(
addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
tx_physical={"can_id": 0x611},
rx_physical={"can_id": 0x612},
tx_functional={"can_id": 0x6FF},
rx_functional={"can_id": 0x6FE})
# create Transport Interface object for UDS communication
can_ti = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
addressing_information=addressing_information)
# define UDS Messages to send
message = UdsMessage(addressing_type=AddressingType.FUNCTIONAL, payload=[0x10, 0x03])
# send UDS Message
message_record = await can_ti.async_send_message(message)
pprint(message_record.__dict__)
# close connections with CAN interfaces
del can_ti
kvaser_interface_1.shutdown()
kvaser_interface_2.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Receive UDS message:
import asyncio
from pprint import pprint
from can import Bus, Message
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.transport_interface import PyCanTransportInterface
async def main():
# configure CAN interfaces
kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True) # receiving
kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True) # sending
# configure Addressing Information of a CAN Node (example values)
addressing_information = CanAddressingInformation(
addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
tx_physical={"can_id": 0x611},
rx_physical={"can_id": 0x612},
tx_functional={"can_id": 0x6FF},
rx_functional={"can_id": 0x6FE})
# create Transport Interface object for UDS communication
can_ti = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
addressing_information=addressing_information)
# define frames carrying a UDS message (to be received later on)
frame_1 = Message(arbitration_id=0x612, data=[0x10, 0x0b, 0x62, 0x10, 0x00, 0x00, 0x01, 0x02])
frame_2 = Message(arbitration_id=0x612, data=[0x21, 0x03, 0x04, 0x05, 0x06, 0x07, 0xCC, 0xCC])
# define task of define UDS message transmission
async def _send_message():
await asyncio.sleep(0.01)
kvaser_interface_2.send(frame_1)
await asyncio.sleep(0.5)
kvaser_interface_2.send(frame_2)
send_message_task = asyncio.create_task(_send_message())
# receive message
received_message_record = await can_ti.async_receive_message(timeout=1000) # 1000 [ms]
await send_message_task
pprint(received_message_record.__dict__)
# close connections with CAN interfaces
del can_ti
kvaser_interface_1.shutdown()
kvaser_interface_2.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Send UDS message on one interface, receive on the other:
import asyncio
from pprint import pprint
from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.message import UdsMessage
from uds.transmission_attributes import AddressingType
from uds.transport_interface import PyCanTransportInterface
async def main():
# configure CAN interfaces
kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True) # receiving
kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True) # sending
# configure Addressing Information of a CAN Nodes (example values)
ai_receive = CanAddressingInformation(
addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
tx_physical={"can_id": 0x611},
rx_physical={"can_id": 0x612},
tx_functional={"can_id": 0x6FF},
rx_functional={"can_id": 0x6FE})
ai_send = CanAddressingInformation(
addressing_format=ai_receive.addressing_format,
tx_physical={"can_id": 0x612},
rx_physical={"can_id": 0x611},
tx_functional={"can_id": 0x6FE},
rx_functional={"can_id": 0x6FF})
# create Transport Interface objects for UDS communication
can_ti_1 = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
addressing_information=ai_receive)
can_ti_2 = PyCanTransportInterface(can_bus_manager=kvaser_interface_2,
addressing_information=ai_send)
# define UDS Messages to send
message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x62, 0x10, 0x00, *range(100)])
# send and receive message
receive_message_task = asyncio.create_task(can_ti_1.async_receive_message(timeout=1000)) # 1000 ms
sent_message_record = await can_ti_2.async_send_message(message)
received_message_record = await receive_message_task
# show results
pprint(received_message_record.__dict__)
pprint(sent_message_record.__dict__)
# close connections with CAN interfaces
del can_ti_1
del can_ti_2
kvaser_interface_1.shutdown()
kvaser_interface_2.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Packet handling
Send CAN packets:
import asyncio
from pprint import pprint
from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.message import UdsMessage
from uds.transmission_attributes import AddressingType
from uds.transport_interface import PyCanTransportInterface
async def main():
# configure CAN interfaces
kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True)
# second interface is only used to acknowledge CAN frames sent by `kvaser_interface_1`,
# you might comment it out if you have another device to do that
kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True)
# configure Addressing Information of a CAN Node (example values set)
addressing_information = CanAddressingInformation(
addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
tx_physical={"can_id": 0x611},
rx_physical={"can_id": 0x612},
tx_functional={"can_id": 0x6FF},
rx_functional={"can_id": 0x6FE})
# create Transport Interface object for UDS communication
can_ti = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
addressing_information=addressing_information)
# define UDS Messages to send
message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])
# create CAN packets that carries those UDS Messages
packet = can_ti.segmenter.segmentation(message)[0]
# send CAN Packet
packet_record = await can_ti.async_send_packet(packet)
pprint(packet_record.__dict__)
# close connections with CAN interfaces
del can_ti
kvaser_interface_1.shutdown()
kvaser_interface_2.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Receive CAN packets:
import asyncio
from pprint import pprint
from can import Bus, Message
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.transport_interface import PyCanTransportInterface
async def main():
# configure CAN interfaces
kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True) # receiving
kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True) # sending
# configure Addressing Information of a CAN Node (example values)
addressing_information = CanAddressingInformation(
addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
tx_physical={"can_id": 0x611},
rx_physical={"can_id": 0x612},
tx_functional={"can_id": 0x6FF},
rx_functional={"can_id": 0x6FE})
# create Transport Interface object for UDS communication
can_ti = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
addressing_information=addressing_information)
# some frames to be received later on
frame_1 = Message(arbitration_id=0x6FE, data=[0x02, 0x10, 0x03])
frame_2 = Message(arbitration_id=0x611, data=[0x02, 0x10, 0x03]) # shall be ignored, as it is not observed CAN ID
frame_3 = Message(arbitration_id=0x612, data=[0x02, 0x3E, 0x00, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA])
# receive CAN packet 1
kvaser_interface_2.send(frame_1) # transmit CAN Frame 1
record_1 = await can_ti.async_receive_packet(timeout=1000) # receive CAN packet 1 carried by frame 1
pprint(record_1.__dict__) # show attributes of CAN packet record 1
# receive CAN packet 2
kvaser_interface_2.send(frame_2) # transmit CAN Frame 2
kvaser_interface_2.send(frame_3) # transmit CAN Frame 3
record_2 = await can_ti.async_receive_packet(timeout=1000)
pprint(record_2.__dict__) # show attributes of CAN packet record 2
# close connections with CAN interfaces
del can_ti
kvaser_interface_1.shutdown()
kvaser_interface_2.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Send CAN packets on one interface, receive on the other:
import asyncio
from pprint import pprint
from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.message import UdsMessage
from uds.transmission_attributes import AddressingType
from uds.transport_interface import PyCanTransportInterface
async def main():
# configure CAN interfaces
kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True) # receiving
kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True) # sending
# configure Addressing Information of a CAN Node (example values)
ai_receive = CanAddressingInformation(
addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
tx_physical={"can_id": 0x611},
rx_physical={"can_id": 0x612},
tx_functional={"can_id": 0x6FF},
rx_functional={"can_id": 0x6FE})
ai_send = CanAddressingInformation(
addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
tx_physical={"can_id": 0x612},
rx_physical={"can_id": 0x611},
tx_functional={"can_id": 0x6FE},
rx_functional={"can_id": 0x6FF})
# create Transport Interface objects for UDS communication
can_ti_1 = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
addressing_information=ai_receive)
can_ti_2 = PyCanTransportInterface(can_bus_manager=kvaser_interface_2,
addressing_information=ai_send)
# define UDS Messages to send
message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])
# create CAN packets that carries those UDS Messages
packet = can_ti_2.segmenter.segmentation(message)[0]
# send and receive packet CAN packet
receive_packet_task = asyncio.create_task(can_ti_1.async_receive_packet(timeout=100))
sent_packet_record = await can_ti_2.async_send_packet(packet)
pprint(sent_packet_record.__dict__)
received_packet_record = await receive_packet_task
pprint(received_packet_record.__dict__)
# close connections with CAN interfaces
del can_ti_1
del can_ti_2
kvaser_interface_1.shutdown()
kvaser_interface_2.shutdown()
if __name__ == "__main__":
asyncio.run(main())