Python-CAN

Examples with python-can package being used for controlling CAN bus (handling CAN frames transmission and reception).

Kvaser interface

Examples with Kvaser CAN interfaces used as a hardware device to send and receive CAN messages.

Synchronous implementation

Message handling

  • Send UDS message:

from pprint import pprint

from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.message import UdsMessage
from uds.transmission_attributes import AddressingType
from uds.transport_interface import PyCanTransportInterface


def main():
    # configure CAN interfaces
    kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True)
    # second interface is only used to acknowledge CAN frames sent by `kvaser_interface_1`,
    # you might comment it out if you have another device to do that
    kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True)

    # configure Addressing Information of a CAN Node (example values)
    addressing_information = CanAddressingInformation(
        addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
        tx_physical={"can_id": 0x611},
        rx_physical={"can_id": 0x612},
        tx_functional={"can_id": 0x6FF},
        rx_functional={"can_id": 0x6FE})

    # create Transport Interface object for UDS communication
    can_ti = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
                                     addressing_information=addressing_information)

    # define UDS Messages to send
    message = UdsMessage(addressing_type=AddressingType.FUNCTIONAL, payload=[0x10, 0x03])

    # send UDS Message
    message_record = can_ti.send_message(message)
    pprint(message_record.__dict__)

    # close connections with CAN interfaces
    del can_ti
    kvaser_interface_1.shutdown()
    kvaser_interface_2.shutdown()


if __name__ == "__main__":
    main()
  • Receive UDS message:

from pprint import pprint
from threading import Timer

from can import Bus, Message
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.transport_interface import PyCanTransportInterface


def main():
    # configure CAN interfaces
    kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True)  # receiving
    kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True)  # sending

    # configure Addressing Information of a CAN Node (example values)
    addressing_information = CanAddressingInformation(
        addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
        tx_physical={"can_id": 0x611},
        rx_physical={"can_id": 0x612},
        tx_functional={"can_id": 0x6FF},
        rx_functional={"can_id": 0x6FE})

    # create Transport Interface object for UDS communication
    can_ti = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
                                     addressing_information=addressing_information)

    # define frames carrying a UDS message (to be received later on)
    frame_1 = Message(arbitration_id=0x612, data=[0x10, 0x0b, 0x62, 0x10, 0x00, 0x00, 0x01, 0x02])
    frame_2 = Message(arbitration_id=0x612, data=[0x21, 0x03, 0x04, 0x05, 0x06, 0x07, 0xCC, 0xCC])

    # schedule UDS message transmission carried by frame_1 and frame_2
    Timer(interval=0.01, function=kvaser_interface_2.send, args=(frame_1, )).start()
    Timer(interval=0.5, function=kvaser_interface_2.send, args=(frame_2, )).start()

    # receive message
    received_message_record = can_ti.receive_message(timeout=1000)  # 1000 [ms]
    pprint(received_message_record.__dict__)

    # close connections with CAN interfaces
    del can_ti
    kvaser_interface_1.shutdown()
    kvaser_interface_2.shutdown()


if __name__ == "__main__":
    main()
  • Send UDS message on one interface, receive on the other:

from pprint import pprint
from threading import Timer
from time import sleep

from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.message import UdsMessage
from uds.transmission_attributes import AddressingType
from uds.transport_interface import PyCanTransportInterface


def main():
    # configure CAN interfaces
    kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True)  # receiving
    kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True)  # sending

    # configure Addressing Information of a CAN Nodes (example values)
    ai_receive = CanAddressingInformation(
        addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
        tx_physical={"can_id": 0x611},
        rx_physical={"can_id": 0x612},
        tx_functional={"can_id": 0x6FF},
        rx_functional={"can_id": 0x6FE})
    ai_send = CanAddressingInformation(
        addressing_format=ai_receive.addressing_format,
        tx_physical={"can_id": 0x612},
        rx_physical={"can_id": 0x611},
        tx_functional={"can_id": 0x6FE},
        rx_functional={"can_id": 0x6FF})

    # create Transport Interface objects for UDS communication
    can_ti_1 = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
                                       addressing_information=ai_receive)
    can_ti_2 = PyCanTransportInterface(can_bus_manager=kvaser_interface_2,
                                       addressing_information=ai_send)

    # define UDS Messages to send
    message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x62, 0x10, 0x00, *range(100)])

    # prepare code for scheduling transmission
    sent_message_record = None

    def _send_message():
        nonlocal sent_message_record
        sent_message_record = can_ti_2.send_message(message)

    timer = Timer(interval=0.01,  # delay after which message will be sent, now 0.01 [s]
                  function=_send_message)

    # send and receive message
    timer.start()
    received_message_record = can_ti_1.receive_message(timeout=1000)  # 1000 [ms]

    # wait till message is received
    while not timer.finished.is_set():
        sleep(0.01)

    # show results
    pprint(received_message_record.__dict__)
    pprint(sent_message_record.__dict__)

    # close connections with CAN interfaces
    del can_ti_1
    del can_ti_2
    kvaser_interface_1.shutdown()
    kvaser_interface_2.shutdown()


if __name__ == "__main__":
    main()

Packet handling

  • Send CAN packets:

from pprint import pprint

from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.message import UdsMessage
from uds.transmission_attributes import AddressingType
from uds.transport_interface import PyCanTransportInterface


def main():
    # configure CAN interfaces
    kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True)
    # second interface is only used to acknowledge CAN frames sent by `kvaser_interface_1`,
    # you might comment it out if you have another device to do that
    kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True)

    # configure Addressing Information of a CAN Node (example values set)
    addressing_information = CanAddressingInformation(
        addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
        tx_physical={"can_id": 0x611},
        rx_physical={"can_id": 0x612},
        tx_functional={"can_id": 0x6FF},
        rx_functional={"can_id": 0x6FE})

    # create Transport Interface object for UDS communication
    can_ti = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
                                     addressing_information=addressing_information)

    # define UDS Messages to send
    message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])

    # create CAN packets that carries those UDS Messages
    packet = can_ti.segmenter.segmentation(message)[0]

    # send CAN Packet
    packet_record = can_ti.send_packet(packet)
    pprint(packet_record.__dict__)

    # close connections with CAN interfaces
    del can_ti
    kvaser_interface_1.shutdown()
    kvaser_interface_2.shutdown()


if __name__ == "__main__":
    main()
  • Receive CAN packets:

from pprint import pprint
from threading import Timer

from can import Bus, Message
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.transport_interface import PyCanTransportInterface


def main():
    # configure CAN interfaces
    kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True)  # receiving
    kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True)  # sending

    # configure Addressing Information of a CAN Node (example values)
    addressing_information = CanAddressingInformation(
        addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
        tx_physical={"can_id": 0x611},
        rx_physical={"can_id": 0x612},
        tx_functional={"can_id": 0x6FF},
        rx_functional={"can_id": 0x6FE})

    # create Transport Interface object for UDS communication
    can_ti = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
                                     addressing_information=addressing_information)

    # some frames to be received later on
    frame_1 = Message(arbitration_id=0x6FE, data=[0x02, 0x10, 0x03])
    frame_2 = Message(arbitration_id=0x611, data=[0x02, 0x10, 0x03])  # shall be ignored, as it is not observed CAN ID
    frame_3 = Message(arbitration_id=0x612, data=[0x02, 0x3E, 0x00, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA])

    # receive CAN packet 1
    Timer(interval=0.1, function=kvaser_interface_1.send, args=(frame_1,)).start()  # schedule transmission of frame 1
    record_1 = can_ti.receive_packet(timeout=1000)  # receive CAN packet 1 carried by frame 1
    pprint(record_1.__dict__)  # show attributes of CAN packet record 1

    # receive CAN packet 2
    Timer(interval=0.1, function=kvaser_interface_1.send, args=(frame_2,)).start()  # schedule transmission of frame 2
    Timer(interval=0.2, function=kvaser_interface_1.send, args=(frame_3,)).start()  # schedule transmission of frame 3
    record_2 = can_ti.receive_packet(timeout=1000)  # receive CAN packet 2 carried by frame 3
    pprint(record_2.__dict__)  # show attributes of CAN packet record 2

    # close connections with CAN interfaces
    del can_ti
    kvaser_interface_1.shutdown()
    kvaser_interface_2.shutdown()


if __name__ == "__main__":
    main()
  • Send CAN packets on one interface, receive on the other:

from pprint import pprint

from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.message import UdsMessage
from uds.transmission_attributes import AddressingType
from uds.transport_interface import PyCanTransportInterface


def main():
    # configure CAN interfaces
    kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True)  # receiving
    kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True)  # sending

    # configure Addressing Information of a CAN Nodes (example values)
    ai_receive = CanAddressingInformation(
        addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
        tx_physical={"can_id": 0x611},
        rx_physical={"can_id": 0x612},
        tx_functional={"can_id": 0x6FF},
        rx_functional={"can_id": 0x6FE})
    ai_send = CanAddressingInformation(
        addressing_format=ai_receive.addressing_format,
        tx_physical={"can_id": 0x612},
        rx_physical={"can_id": 0x611},
        tx_functional={"can_id": 0x6FE},
        rx_functional={"can_id": 0x6FF})

    # create Transport Interface objects for UDS communication
    can_ti_1 = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
                                       addressing_information=ai_receive)
    can_ti_2 = PyCanTransportInterface(can_bus_manager=kvaser_interface_2,
                                       addressing_information=ai_send)

    # define UDS Messages to send
    message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])

    # create CAN packets that carries those UDS Messages
    packet = can_ti_2.segmenter.segmentation(message)[0]

    # send and receive packet CAN packet
    sent_packet_record = can_ti_2.send_packet(packet)
    pprint(sent_packet_record.__dict__)
    received_packet_record = can_ti_1.receive_packet(timeout=100)
    pprint(received_packet_record.__dict__)

    # close connections with CAN interfaces
    del can_ti_1
    del can_ti_2
    kvaser_interface_1.shutdown()
    kvaser_interface_2.shutdown()


if __name__ == "__main__":
    main()

Asynchronous implementation

Message handling

  • Send UDS message:

import asyncio
from pprint import pprint

from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.message import UdsMessage
from uds.transmission_attributes import AddressingType
from uds.transport_interface import PyCanTransportInterface


async def main():
    # configure CAN interfaces
    kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True)
    # second interface is only used to acknowledge CAN frames sent by `kvaser_interface_1`,
    # you might comment it out if you have another device to do that
    kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True)

    # configure Addressing Information of a CAN Node (example values set)
    addressing_information = CanAddressingInformation(
        addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
        tx_physical={"can_id": 0x611},
        rx_physical={"can_id": 0x612},
        tx_functional={"can_id": 0x6FF},
        rx_functional={"can_id": 0x6FE})

    # create Transport Interface object for UDS communication
    can_ti = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
                                     addressing_information=addressing_information)

    # define UDS Messages to send
    message = UdsMessage(addressing_type=AddressingType.FUNCTIONAL, payload=[0x10, 0x03])

    # send UDS Message
    message_record = await can_ti.async_send_message(message)
    pprint(message_record.__dict__)

    # close connections with CAN interfaces
    del can_ti
    kvaser_interface_1.shutdown()
    kvaser_interface_2.shutdown()


if __name__ == "__main__":
    asyncio.run(main())
  • Receive UDS message:

import asyncio
from pprint import pprint

from can import Bus, Message
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.transport_interface import PyCanTransportInterface


async def main():
    # configure CAN interfaces
    kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True)  # receiving
    kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True)  # sending

    # configure Addressing Information of a CAN Node (example values)
    addressing_information = CanAddressingInformation(
        addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
        tx_physical={"can_id": 0x611},
        rx_physical={"can_id": 0x612},
        tx_functional={"can_id": 0x6FF},
        rx_functional={"can_id": 0x6FE})

    # create Transport Interface object for UDS communication
    can_ti = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
                                     addressing_information=addressing_information)

    # define frames carrying a UDS message (to be received later on)
    frame_1 = Message(arbitration_id=0x612, data=[0x10, 0x0b, 0x62, 0x10, 0x00, 0x00, 0x01, 0x02])
    frame_2 = Message(arbitration_id=0x612, data=[0x21, 0x03, 0x04, 0x05, 0x06, 0x07, 0xCC, 0xCC])

    # define task of define UDS message transmission
    async def _send_message():
        await asyncio.sleep(0.01)
        kvaser_interface_2.send(frame_1)
        await asyncio.sleep(0.5)
        kvaser_interface_2.send(frame_2)

    send_message_task = asyncio.create_task(_send_message())

    # receive message
    received_message_record = await can_ti.async_receive_message(timeout=1000)  # 1000 [ms]
    await send_message_task
    pprint(received_message_record.__dict__)

    # close connections with CAN interfaces
    del can_ti
    kvaser_interface_1.shutdown()
    kvaser_interface_2.shutdown()


if __name__ == "__main__":
    asyncio.run(main())
  • Send UDS message on one interface, receive on the other:

import asyncio
from pprint import pprint

from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.message import UdsMessage
from uds.transmission_attributes import AddressingType
from uds.transport_interface import PyCanTransportInterface


async def main():
    # configure CAN interfaces
    kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True)  # receiving
    kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True)  # sending

    # configure Addressing Information of a CAN Nodes (example values)
    ai_receive = CanAddressingInformation(
        addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
        tx_physical={"can_id": 0x611},
        rx_physical={"can_id": 0x612},
        tx_functional={"can_id": 0x6FF},
        rx_functional={"can_id": 0x6FE})
    ai_send = CanAddressingInformation(
        addressing_format=ai_receive.addressing_format,
        tx_physical={"can_id": 0x612},
        rx_physical={"can_id": 0x611},
        tx_functional={"can_id": 0x6FE},
        rx_functional={"can_id": 0x6FF})

    # create Transport Interface objects for UDS communication
    can_ti_1 = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
                                       addressing_information=ai_receive)
    can_ti_2 = PyCanTransportInterface(can_bus_manager=kvaser_interface_2,
                                       addressing_information=ai_send)

    # define UDS Messages to send
    message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x62, 0x10, 0x00, *range(100)])

    # send and receive message
    receive_message_task = asyncio.create_task(can_ti_1.async_receive_message(timeout=1000))  # 1000 ms
    sent_message_record = await can_ti_2.async_send_message(message)
    received_message_record = await receive_message_task

    # show results
    pprint(received_message_record.__dict__)
    pprint(sent_message_record.__dict__)

    # close connections with CAN interfaces
    del can_ti_1
    del can_ti_2
    kvaser_interface_1.shutdown()
    kvaser_interface_2.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

Packet handling

  • Send CAN packets:

import asyncio
from pprint import pprint

from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.message import UdsMessage
from uds.transmission_attributes import AddressingType
from uds.transport_interface import PyCanTransportInterface


async def main():
    # configure CAN interfaces
    kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True)
    # second interface is only used to acknowledge CAN frames sent by `kvaser_interface_1`,
    # you might comment it out if you have another device to do that
    kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True)

    # configure Addressing Information of a CAN Node (example values set)
    addressing_information = CanAddressingInformation(
        addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
        tx_physical={"can_id": 0x611},
        rx_physical={"can_id": 0x612},
        tx_functional={"can_id": 0x6FF},
        rx_functional={"can_id": 0x6FE})

    # create Transport Interface object for UDS communication
    can_ti = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
                                     addressing_information=addressing_information)

    # define UDS Messages to send
    message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])

    # create CAN packets that carries those UDS Messages
    packet = can_ti.segmenter.segmentation(message)[0]

    # send CAN Packet
    packet_record = await can_ti.async_send_packet(packet)
    pprint(packet_record.__dict__)

    # close connections with CAN interfaces
    del can_ti
    kvaser_interface_1.shutdown()
    kvaser_interface_2.shutdown()


if __name__ == "__main__":
    asyncio.run(main())
  • Receive CAN packets:

import asyncio
from pprint import pprint

from can import Bus, Message
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.transport_interface import PyCanTransportInterface


async def main():
    # configure CAN interfaces
    kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True)  # receiving
    kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True)  # sending

    # configure Addressing Information of a CAN Node (example values)
    addressing_information = CanAddressingInformation(
        addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
        tx_physical={"can_id": 0x611},
        rx_physical={"can_id": 0x612},
        tx_functional={"can_id": 0x6FF},
        rx_functional={"can_id": 0x6FE})

    # create Transport Interface object for UDS communication
    can_ti = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
                                     addressing_information=addressing_information)

    # some frames to be received later on
    frame_1 = Message(arbitration_id=0x6FE, data=[0x02, 0x10, 0x03])
    frame_2 = Message(arbitration_id=0x611, data=[0x02, 0x10, 0x03])  # shall be ignored, as it is not observed CAN ID
    frame_3 = Message(arbitration_id=0x612, data=[0x02, 0x3E, 0x00, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA])

    # receive CAN packet 1
    kvaser_interface_2.send(frame_1)  # transmit CAN Frame 1
    record_1 = await can_ti.async_receive_packet(timeout=1000)   # receive CAN packet 1 carried by frame 1
    pprint(record_1.__dict__)  # show attributes of CAN packet record 1

    # receive CAN packet 2
    kvaser_interface_2.send(frame_2)  # transmit CAN Frame 2
    kvaser_interface_2.send(frame_3)  # transmit CAN Frame 3
    record_2 = await can_ti.async_receive_packet(timeout=1000)
    pprint(record_2.__dict__)  # show attributes of CAN packet record 2

    # close connections with CAN interfaces
    del can_ti
    kvaser_interface_1.shutdown()
    kvaser_interface_2.shutdown()


if __name__ == "__main__":
    asyncio.run(main())
  • Send CAN packets on one interface, receive on the other:

import asyncio
from pprint import pprint

from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.message import UdsMessage
from uds.transmission_attributes import AddressingType
from uds.transport_interface import PyCanTransportInterface


async def main():
    # configure CAN interfaces
    kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True)  # receiving
    kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True)  # sending

    # configure Addressing Information of a CAN Node (example values)
    ai_receive = CanAddressingInformation(
        addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
        tx_physical={"can_id": 0x611},
        rx_physical={"can_id": 0x612},
        tx_functional={"can_id": 0x6FF},
        rx_functional={"can_id": 0x6FE})
    ai_send = CanAddressingInformation(
        addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
        tx_physical={"can_id": 0x612},
        rx_physical={"can_id": 0x611},
        tx_functional={"can_id": 0x6FE},
        rx_functional={"can_id": 0x6FF})

    # create Transport Interface objects for UDS communication
    can_ti_1 = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
                                       addressing_information=ai_receive)
    can_ti_2 = PyCanTransportInterface(can_bus_manager=kvaser_interface_2,
                                       addressing_information=ai_send)

    # define UDS Messages to send
    message = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x10, 0x03])

    # create CAN packets that carries those UDS Messages
    packet = can_ti_2.segmenter.segmentation(message)[0]

    # send and receive packet CAN packet
    receive_packet_task = asyncio.create_task(can_ti_1.async_receive_packet(timeout=100))
    sent_packet_record = await can_ti_2.async_send_packet(packet)
    pprint(sent_packet_record.__dict__)
    received_packet_record = await receive_packet_task
    pprint(received_packet_record.__dict__)

    # close connections with CAN interfaces
    del can_ti_1
    del can_ti_2
    kvaser_interface_1.shutdown()
    kvaser_interface_2.shutdown()


if __name__ == "__main__":
    asyncio.run(main())