Python-CAN

Examples using the python-can package to control the CAN bus, including transmission and reception of CAN frames.

Kvaser interface

Examples demonstrating the use of Kvaser CAN interfaces as hardware devices for sending and receiving CAN messages.

Synchronous implementation

Message handling

  • Send UDS message:

"""Send a message using Diagnostic on CAN protocol (ISO 15765)."""

from can import Bus
from uds.addressing import AddressingType
from uds.can import CanAddressingFormat, CanAddressingInformation, CanVersion, PythonCanTransportInterface
from uds.message import UdsMessage


def main():
    # configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
    can_interface = Bus(
        # provide configuration for your CAN interface
        interface="kvaser",  # replace with your CAN interface name
        channel=0,
        receive_own_messages=True,  # mandatory setting if you use Kvaser
        # configure your CAN bus
        bitrate=500_000,
        fd=False)

    # configure addresses for Diagnostics on CAN communication
    # CAN Addressing Formats explanation:
    # https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
    addressing_information = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
                                                      rx_physical_params={"can_id": 0x611},
                                                      tx_physical_params={"can_id": 0x612},
                                                      rx_functional_params={"can_id": 0x6FF},
                                                      tx_functional_params={"can_id": 0x6FE})

    # create Transport Interface object for Diagnostics on CAN communication
    can_ti = PythonCanTransportInterface(
        network_manager=can_interface,
        addressing_information=addressing_information,
        can_version=CanVersion.CLASSIC_CAN)  # send all diagnostic packets as Classic CAN frames

    # define UDS Messages to send
    message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])

    # send UDS message
    sent_message_record = can_ti.send_message(message=message)

    # show sent message
    print(sent_message_record)

    # close connections with CAN interface
    del can_ti
    can_interface.shutdown()


if __name__ == "__main__":
    main()
  • Receive UDS message:

"""Receive a message using Diagnostic on CAN protocol (ISO 15765)."""

from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation, PythonCanTransportInterface


def main():
    # configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
    can_interface = Bus(
        # provide configuration for your CAN interface
        interface="kvaser",  # replace with your CAN interface name
        channel=0,
        receive_own_messages=True,  # mandatory setting if you use Kvaser
        # configure your CAN bus
        bitrate=500_000,
        fd=True,
        data_bitrate=4_000_000)

    # configure addresses for Diagnostics on CAN communication
    # CAN Addressing Formats explanation:
    # https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
    addressing_information = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
                                                      rx_physical_params={"can_id": 0x611},
                                                      tx_physical_params={"can_id": 0x612},
                                                      rx_functional_params={"can_id": 0x6FF},
                                                      tx_functional_params={"can_id": 0x6FE})

    # create Transport Interface object for Diagnostics on CAN communication
    can_ti = PythonCanTransportInterface(
        network_manager=can_interface,
        addressing_information=addressing_information)

    # receive UDS message
    received_message_record = can_ti.receive_message(start_timeout=1000)  # timeout=1000 [ms]

    # show received message
    print(received_message_record)

    # close connections with CAN interface
    del can_ti
    can_interface.shutdown()


if __name__ == "__main__":
    main()
  • Send UDS message on one interface, receive on the other:

"""Send (on one interface) and received (on the second) a message using Diagnostic on CAN protocol (ISO 15765)."""

from threading import Timer
from time import sleep

from can import Bus
from uds.addressing import AddressingType
from uds.can import CanAddressingFormat, CanAddressingInformation, CanVersion, PythonCanTransportInterface
from uds.message import UdsMessage


def main():
    # configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
    can_interface_1 = Bus(
        # provide configuration for your CAN interface
        interface="kvaser",  # replace with your CAN interface name
        channel=0,
        receive_own_messages=True,  # mandatory setting if you use Kvaser
        # configure your CAN bus
        bitrate=500_000,
        fd=True,
        data_bitrate=4_000_000)
    # configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
    can_interface_2 = Bus(
        # provide configuration for your CAN interface
        interface="kvaser",  # replace with your CAN interface name
        channel=1,
        receive_own_messages=True,  # mandatory setting if you use Kvaser
        # configure your CAN bus
        bitrate=500_000,
        fd=True,
        data_bitrate=4_000_000)

    # configure addresses for Diagnostics on CAN communication
    # CAN Addressing Formats explanation:
    # https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
    ai_send = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
                                          tx_physical_params={"can_id": 0x611},
                                          rx_physical_params={"can_id": 0x612},
                                          tx_functional_params={"can_id": 0x6FF},
                                          rx_functional_params={"can_id": 0x6FE})
    ai_receive = ai_send.get_other_end()

    # create Transport Interface object for Diagnostics on CAN communication
    can_ti_1 = PythonCanTransportInterface(
        network_manager=can_interface_1,
        addressing_information=ai_send,
        can_version=CanVersion.CAN_FD)  # send all diagnostic packets as CAN FD frames
    can_ti_2 = PythonCanTransportInterface(
        network_manager=can_interface_2,
        addressing_information=ai_receive)

    # define UDS Message to send
    message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x62, 0x10, 0x00, *range(100)])

    # prepare code for scheduling transmission
    sent_message_record = None
    def _send_message():
        nonlocal sent_message_record
        sent_message_record = can_ti_1.send_message(message)
    timer = Timer(interval=0.01,  # delay after which message will be sent, now 0.01 [s]
                  function=_send_message)

    # send and receive message
    timer.start()
    received_message_record = can_ti_2.receive_message(start_timeout=1000)  # timeout=1000 [ms]

    # wait till message is received
    while not timer.finished.is_set():
        sleep(0.01)

    # show results
    print(sent_message_record)
    print(received_message_record)

    # close connections with CAN interfaces
    del can_ti_1
    del can_ti_2
    can_interface_1.shutdown()
    can_interface_2.shutdown()


if __name__ == "__main__":
    main()

Packet handling

  • Send CAN packets:

"""Send a packet defined by Diagnostic on CAN protocol (ISO 15765)."""

from can import Bus
from uds.addressing import AddressingType
from uds.can import CanAddressingFormat, CanAddressingInformation, CanVersion, PythonCanTransportInterface
from uds.message import UdsMessage


def main():
    # configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
    can_interface = Bus(
        # provide configuration for your CAN interface
        interface="kvaser",  # replace with your CAN interface name
        channel=0,
        receive_own_messages=True,  # mandatory setting if you use Kvaser
        # configure your CAN bus
        bitrate=500_000,
        fd=True,
        data_bitrate=4_000_000)

    # configure addresses for Diagnostics on CAN communication
    # CAN Addressing Formats explanation:
    # https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
    addressing_information = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
                                                      rx_physical_params={"can_id": 0x611},
                                                      tx_physical_params={"can_id": 0x612},
                                                      rx_functional_params={"can_id": 0x6FF},
                                                      tx_functional_params={"can_id": 0x6FE})

    # create Transport Interface object for Diagnostics on CAN communication
    can_ti = PythonCanTransportInterface(
        network_manager=can_interface,
        addressing_information=addressing_information,
        can_version=CanVersion.CLASSIC_CAN)  # send all diagnostic packets as Classic CAN frames

    # define UDS Message
    message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])
    # pick one CAN packet from UDS Message segmentation
    packet = can_ti.segmenter.segmentation(message)[0]

    # send CAN Packet
    sent_packet_record = can_ti.send_packet(packet)

    # show sent packet
    print(sent_packet_record)

    # close connections with CAN interface
    del can_ti
    can_interface.shutdown()


if __name__ == "__main__":
    main()
  • Receive CAN packets:

"""Receive a packet defined by Diagnostic on CAN protocol (ISO 15765)."""

from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation, PythonCanTransportInterface


def main():
    # configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
    can_interface = Bus(
        # provide configuration for your CAN interface
        interface="kvaser",  # replace with your CAN interface name
        channel=0,
        receive_own_messages=True,  # mandatory setting if you use Kvaser
        # configure your CAN bus
        bitrate=500_000,
        fd=True,
        data_bitrate=4_000_000)

    # configure addresses for Diagnostics on CAN communication
    # CAN Addressing Formats explanation:
    # https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
    addressing_information = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
                                                      rx_physical_params={"can_id": 0x611},
                                                      tx_physical_params={"can_id": 0x612},
                                                      rx_functional_params={"can_id": 0x6FF},
                                                      tx_functional_params={"can_id": 0x6FE})

    # create Transport Interface object for Diagnostics on CAN communication
    can_ti = PythonCanTransportInterface(
        network_manager=can_interface,
        addressing_information=addressing_information)

    # receive CAN packet
    received_packet_record = can_ti.receive_packet(timeout=1000)  # timeout=1000 [ms]

    # show received packet
    print(received_packet_record)

    # close connections with CAN interface
    del can_ti
    can_interface.shutdown()


if __name__ == "__main__":
    main()
  • Send CAN packets on one interface, receive on the other:

"""Send (on one interface) and received (on the second) a packet defined by Diagnostic on CAN protocol (ISO 15765)."""

from can import Bus
from uds.addressing import AddressingType
from uds.can import CanAddressingFormat, CanAddressingInformation, CanVersion, PythonCanTransportInterface
from uds.message import UdsMessage


def main():
    # configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
    can_interface_1 = Bus(
        # provide configuration for your CAN interface
        interface="kvaser",  # replace with your CAN interface name
        channel=0,
        receive_own_messages=True,  # mandatory setting if you use Kvaser
        # configure your CAN bus
        bitrate=500_000,
        fd=True,
        data_bitrate=4_000_000)
    # configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
    can_interface_2 = Bus(
        # provide configuration for your CAN interface
        interface="kvaser",  # replace with your CAN interface name
        channel=1,
        receive_own_messages=True,  # mandatory setting if you use Kvaser
        # configure your CAN bus
        bitrate=500_000,
        fd=True,
        data_bitrate=4_000_000)

    # configure addresses for Diagnostics on CAN communication
    # CAN Addressing Formats explanation:
    # https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
    ai_send = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
                                          tx_physical_params={"can_id": 0x611},
                                          rx_physical_params={"can_id": 0x612},
                                          tx_functional_params={"can_id": 0x6FF},
                                          rx_functional_params={"can_id": 0x6FE})
    ai_receive = ai_send.get_other_end()

    # create Transport Interface object for Diagnostics on CAN communication
    can_ti_1 = PythonCanTransportInterface(
        network_manager=can_interface_1,
        addressing_information=ai_send,
        can_version=CanVersion.CAN_FD)  # send all diagnostic packets as CAN FD frames
    can_ti_2 = PythonCanTransportInterface(
        network_manager=can_interface_2,
        addressing_information=ai_receive)

    # define UDS Message
    message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])
    # pick one CAN packet from UDS Message segmentation
    packet = can_ti_1.segmenter.segmentation(message)[0]

    # send and receive packet CAN packet
    sent_packet_record = can_ti_1.send_packet(packet)
    received_packet_record = can_ti_2.receive_packet(timeout=100)

    # show results
    print(sent_packet_record)
    print(received_packet_record)

    # close connections with CAN interfaces
    del can_ti_1
    del can_ti_2
    can_interface_1.shutdown()
    can_interface_2.shutdown()


if __name__ == "__main__":
    main()

Asynchronous implementation

Message handling

  • Send UDS message:

"""Send asynchronously a message using Diagnostic on CAN protocol (ISO 15765)."""

import asyncio

from can import Bus
from uds.addressing import AddressingType
from uds.can import CanAddressingFormat, CanAddressingInformation, CanVersion, PythonCanTransportInterface
from uds.message import UdsMessage


async def main():
    # configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
    can_interface = Bus(
        # provide configuration for your CAN interface
        interface="kvaser",  # replace with your CAN interface name
        channel=0,
        receive_own_messages=True,  # mandatory setting if you use Kvaser
        # configure your CAN bus
        bitrate=500_000,
        fd=False)

    # configure addresses for Diagnostics on CAN communication
    # CAN Addressing Formats explanation:
    # https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
    addressing_information = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
                                                      rx_physical_params={"can_id": 0x611},
                                                      tx_physical_params={"can_id": 0x612},
                                                      rx_functional_params={"can_id": 0x6FF},
                                                      tx_functional_params={"can_id": 0x6FE})

    # create Transport Interface object for Diagnostics on CAN communication
    can_ti = PythonCanTransportInterface(
        network_manager=can_interface,
        addressing_information=addressing_information,
        can_version=CanVersion.CLASSIC_CAN)  # send all diagnostic packets as Classic CAN frames

    # define UDS Messages to send
    message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])

    # send UDS Message
    sent_message_record = await can_ti.async_send_message(message)

    # show sent message
    print(sent_message_record)

    # close connections with CAN interface
    del can_ti
    can_interface.shutdown()


if __name__ == "__main__":
    asyncio.run(main())
  • Receive UDS message:

"""Receive asynchronously a message using Diagnostic on CAN protocol (ISO 15765)."""

import asyncio

from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation, PythonCanTransportInterface


async def main():
    # configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
    can_interface = Bus(
        # provide configuration for your CAN interface
        interface="kvaser",  # replace with your CAN interface name
        channel=0,
        receive_own_messages=True,  # mandatory setting if you use Kvaser
        # configure your CAN bus
        bitrate=500_000,
        fd=True,
        data_bitrate=4_000_000)

    # configure addresses for Diagnostics on CAN communication
    # CAN Addressing Formats explanation:
    # https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
    addressing_information = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
                                                      rx_physical_params={"can_id": 0x611},
                                                      tx_physical_params={"can_id": 0x612},
                                                      rx_functional_params={"can_id": 0x6FF},
                                                      tx_functional_params={"can_id": 0x6FE})

    # create Transport Interface object for Diagnostics on CAN communication
    can_ti = PythonCanTransportInterface(
        network_manager=can_interface,
        addressing_information=addressing_information)

    # receive UDS message
    received_message_record = await can_ti.async_receive_message(start_timeout=1000)  # timeout=1000 [ms]

    # show received message
    print(received_message_record)

    # close connections with CAN interface
    del can_ti
    can_interface.shutdown()


if __name__ == "__main__":
    asyncio.run(main())
  • Send UDS message on one interface, receive on the other:

"""Send (on one interface) and received (on the second) asynchronously a message using Diagnostic on CAN protocol (ISO 15765)."""

import asyncio

from can import Bus
from uds.addressing import AddressingType
from uds.can import CanAddressingFormat, CanAddressingInformation, CanVersion, PythonCanTransportInterface
from uds.message import UdsMessage


async def main():
    # configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
    can_interface_1 = Bus(
        # provide configuration for your CAN interface
        interface="kvaser",  # replace with your CAN interface name
        channel=0,
        receive_own_messages=True,  # mandatory setting if you use Kvaser
        # configure your CAN bus
        bitrate=500_000,
        fd=True,
        data_bitrate=4_000_000)
    # configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
    can_interface_2 = Bus(
        # provide configuration for your CAN interface
        interface="kvaser",  # replace with your CAN interface name
        channel=1,
        receive_own_messages=True,  # mandatory setting if you use Kvaser
        # configure your CAN bus
        bitrate=500_000,
        fd=True,
        data_bitrate=4_000_000)

    # configure addresses for Diagnostics on CAN communication
    # CAN Addressing Formats explanation:
    # https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
    ai_send = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
                                          tx_physical_params={"can_id": 0x611},
                                          rx_physical_params={"can_id": 0x612},
                                          tx_functional_params={"can_id": 0x6FF},
                                          rx_functional_params={"can_id": 0x6FE})
    ai_receive = ai_send.get_other_end()

    # create Transport Interface object for Diagnostics on CAN communication
    can_ti_1 = PythonCanTransportInterface(
        network_manager=can_interface_1,
        addressing_information=ai_send,
        can_version=CanVersion.CAN_FD)  # send all diagnostic packets as CAN FD frames
    can_ti_2 = PythonCanTransportInterface(
        network_manager=can_interface_2,
        addressing_information=ai_receive)

    # define UDS Message to send
    message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x62, 0x10, 0x00, *range(100)])

    # send and receive message
    receive_message_task = asyncio.create_task(can_ti_2.async_receive_message(start_timeout=1000))  # timeout=1000 ms
    sent_message_record = await can_ti_1.async_send_message(message)
    received_message_record = await receive_message_task

    # show results
    print(sent_message_record)
    print(received_message_record)

    # close connections with CAN interfaces
    del can_ti_1
    del can_ti_2
    can_interface_1.shutdown()
    can_interface_2.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

Packet handling

  • Send CAN packets:

"""Send asynchronously a packet defined by Diagnostic on CAN protocol (ISO 15765)."""

import asyncio

from can import Bus
from uds.addressing import AddressingType
from uds.can import CanAddressingFormat, CanAddressingInformation, CanVersion, PythonCanTransportInterface
from uds.message import UdsMessage


async def main():
    # configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
    can_interface = Bus(
        # provide configuration for your CAN interface
        interface="kvaser",  # replace with your CAN interface name
        channel=0,
        receive_own_messages=True,  # mandatory setting if you use Kvaser
        # configure your CAN bus
        bitrate=500_000,
        fd=True,
        data_bitrate=4_000_000)

    # configure addresses for Diagnostics on CAN communication
    # CAN Addressing Formats explanation:
    # https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
    addressing_information = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
                                                      rx_physical_params={"can_id": 0x611},
                                                      tx_physical_params={"can_id": 0x612},
                                                      rx_functional_params={"can_id": 0x6FF},
                                                      tx_functional_params={"can_id": 0x6FE})

    # create Transport Interface object for Diagnostics on CAN communication
    can_ti = PythonCanTransportInterface(
        network_manager=can_interface,
        addressing_information=addressing_information,
        can_version=CanVersion.CLASSIC_CAN)  # send all diagnostic packets as Classic CAN frames

    # define UDS Message
    message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])
    # pick one CAN packet from UDS Message segmentation
    packet = can_ti.segmenter.segmentation(message)[0]

    # send CAN Packet
    sent_packet_record = await can_ti.async_send_packet(packet)

    # show sent packet
    print(sent_packet_record)

    # close connections with CAN interface
    del can_ti
    can_interface.shutdown()


if __name__ == "__main__":
    asyncio.run(main())
  • Receive CAN packets:

"""Receive asynchronously a packet defined by Diagnostic on CAN protocol (ISO 15765)."""

import asyncio

from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation, PythonCanTransportInterface


async def main():
    # configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
    can_interface = Bus(
        # provide configuration for your CAN interface
        interface="kvaser",  # replace with your CAN interface name
        channel=0,
        receive_own_messages=True,  # mandatory setting if you use Kvaser
        # configure your CAN bus
        bitrate=500_000,
        fd=True,
        data_bitrate=4_000_000)

    # configure addresses for Diagnostics on CAN communication
    # CAN Addressing Formats explanation:
    # https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
    addressing_information = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
                                                      rx_physical_params={"can_id": 0x611},
                                                      tx_physical_params={"can_id": 0x612},
                                                      rx_functional_params={"can_id": 0x6FF},
                                                      tx_functional_params={"can_id": 0x6FE})

    # create Transport Interface object for Diagnostics on CAN communication
    can_ti = PythonCanTransportInterface(
        network_manager=can_interface,
        addressing_information=addressing_information)

    # receive CAN packet
    received_packet_record = await can_ti.async_receive_packet(timeout=1000)  # timeout=1000 [ms]

    # show received packet
    print(received_packet_record)

    # close connections with CAN interface
    del can_ti
    can_interface.shutdown()


if __name__ == "__main__":
    asyncio.run(main())
  • Send CAN packets on one interface, receive on the other:

"""Send (on one interface) and received (on the second) asynchronously a packet defined by Diagnostic on CAN protocol (ISO 15765)."""

import asyncio

from can import Bus
from uds.addressing import AddressingType
from uds.can import CanAddressingFormat, CanAddressingInformation, CanVersion, PythonCanTransportInterface
from uds.message import UdsMessage


async def main():
    # configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
    can_interface_1 = Bus(
        # provide configuration for your CAN interface
        interface="kvaser",  # replace with your CAN interface name
        channel=0,
        receive_own_messages=True,  # mandatory setting if you use Kvaser
        # configure your CAN bus
        bitrate=500_000,
        fd=True,
        data_bitrate=4_000_000)
    # configure CAN interface - https://python-can.readthedocs.io/en/stable/interfaces.html
    can_interface_2 = Bus(
        # provide configuration for your CAN interface
        interface="kvaser",  # replace with your CAN interface name
        channel=1,
        receive_own_messages=True,  # mandatory setting if you use Kvaser
        # configure your CAN bus
        bitrate=500_000,
        fd=True,
        data_bitrate=4_000_000)

    # configure addresses for Diagnostics on CAN communication
    # CAN Addressing Formats explanation:
    # https://uds.readthedocs.io/en/stable/pages/knowledge_base/packet.html#can-packet-addressing-formats
    ai_send = CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING,
                                          tx_physical_params={"can_id": 0x611},
                                          rx_physical_params={"can_id": 0x612},
                                          tx_functional_params={"can_id": 0x6FF},
                                          rx_functional_params={"can_id": 0x6FE})
    ai_receive = ai_send.get_other_end()

    # create Transport Interface object for Diagnostics on CAN communication
    can_ti_1 = PythonCanTransportInterface(
        network_manager=can_interface_1,
        addressing_information=ai_send,
        can_version=CanVersion.CAN_FD)  # send all diagnostic packets as CAN FD frames
    can_ti_2 = PythonCanTransportInterface(
        network_manager=can_interface_2,
        addressing_information=ai_receive)

    # define UDS Message
    message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])
    # pick one CAN packet from UDS Message segmentation
    packet = can_ti_1.segmenter.segmentation(message)[0]

    # send and receive packet CAN packet
    receive_packet_task = asyncio.create_task(can_ti_2.async_receive_packet(timeout=100))  # timeout=100 ms
    sent_packet_record = await can_ti_1.async_send_packet(packet)
    received_packet_record = await receive_packet_task

    # show results
    print(sent_packet_record)
    print(received_packet_record)

    # close connections with CAN interfaces
    del can_ti_1
    del can_ti_2
    can_interface_1.shutdown()
    can_interface_2.shutdown()


if __name__ == "__main__":
    asyncio.run(main())