Source code for uds.message.uds_message

"""
Module with common implementation of all diagnostic messages (requests and responses).

:ref:`Diagnostic message <knowledge-base-diagnostic-message>` are defined on upper layers of UDS OSI Model.
"""

__all__ = ["AbstractUdsMessageContainer", "UdsMessage", "UdsMessageRecord"]

from typing import Sequence
from abc import ABC, abstractmethod
from datetime import datetime

from uds.utilities import RawBytesAlias, RawBytesTupleAlias, RawBytesListAlias, validate_raw_bytes, ReassignmentError
from uds.transmission_attributes import TransmissionDirection, AddressingType
from uds.packet import AbstractUdsPacketRecord, PacketsRecordsTuple, PacketsRecordsSequence


[docs] class AbstractUdsMessageContainer(ABC): """Abstract definition of a container with a diagnostic message information."""
[docs] @abstractmethod def __eq__(self, other: object) -> bool: """ Compare with other object. :param other: Object to compare. :return: True if other object has the same type and carries the same diagnostic message, otherwise False. """
@property @abstractmethod def payload(self) -> RawBytesTupleAlias: """Raw payload bytes carried by this diagnostic message.""" @property @abstractmethod def addressing_type(self) -> AddressingType: """Addressing for which this diagnostic message is relevant."""
[docs] class UdsMessage(AbstractUdsMessageContainer): """ Definition of a diagnostic message. Objects of this class act as a storage for all relevant attributes of a :ref:`diagnostic message <knowledge-base-diagnostic-message>`. Later on, such object might be used in a segmentation process or to transmit the message. Once a message is transmitted, its historic data would be stored in :class:`~uds.message.uds_message.UdsMessageRecord`. """ def __init__(self, payload: RawBytesAlias, addressing_type: AddressingType) -> None: """ Create a storage for a single diagnostic message. :param payload: Raw payload bytes carried by this diagnostic message. :param addressing_type: Addressing for which this diagnostic message is relevant. """ self.payload = payload # type: ignore self.addressing_type = addressing_type
[docs] def __eq__(self, other: object) -> bool: """ Compare with other object. :param other: Object to compare. :return: True if other object has the same type and carries the same diagnostic message, otherwise False. """ if not isinstance(other, self.__class__): raise TypeError("UDS Message can only be compared with another UDS Message") return self.addressing_type == other.addressing_type and self.payload == other.payload
@property def payload(self) -> RawBytesTupleAlias: """Raw payload bytes carried by this diagnostic message.""" return self.__payload @payload.setter def payload(self, value: RawBytesAlias): """ Set value of raw payload bytes that this diagnostic message carries. :param value: Payload value to set. """ validate_raw_bytes(value) self.__payload = tuple(value) @property def addressing_type(self) -> AddressingType: """Addressing for which this diagnostic message is relevant.""" return self.__addressing_type @addressing_type.setter def addressing_type(self, value: AddressingType): """ Set value of addressing for this diagnostic message. :param value: Addressing value to set. """ self.__addressing_type = AddressingType.validate_member(value)
[docs] class UdsMessageRecord(AbstractUdsMessageContainer): """Storage for historic information about a diagnostic message that was either received or transmitted.""" def __init__(self, packets_records: PacketsRecordsSequence) -> None: """ Create a record of historic information about a diagnostic message that was either received or transmitted. :param packets_records: Sequence (in transmission order) of UDS packets records that carried this diagnostic message. """ self.packets_records = packets_records # type: ignore
[docs] def __eq__(self, other: object) -> bool: """ Compare with other object. :param other: Object to compare. :return: True if other object has the same type and carries the same diagnostic message, otherwise False. """ if not isinstance(other, self.__class__): raise TypeError("UDS Message Record can only be compared with another UDS Message Record") return self.addressing_type == other.addressing_type \ and self.payload == other.payload \ and self.direction == other.direction
@staticmethod def __validate_packets_records(value: PacketsRecordsSequence) -> None: """ Validate whether the argument contains UDS Packets records. :param value: Value to validate. :raise TypeError: UDS Packet Records sequence is not list or tuple type. :raise ValueError: At least one of UDS Packet Records sequence elements is not an object of :class:`~uds.message.uds_packet.AbstractUdsPacketRecord` class. """ if not isinstance(value, Sequence): raise TypeError(f"Provided value is not a sequence. Actual type: {type(value)}") if not value or any(not isinstance(element, AbstractUdsPacketRecord) for element in value): raise ValueError(f"Provided value must contain only instances of AbstractUdsPacketRecord class. " f"Actual value: {value}") @property def packets_records(self) -> PacketsRecordsTuple: """ Sequence (in transmission order) of UDS packets records that carried this diagnostic message. :ref:`UDS packets <knowledge-base-uds-packet>` sequence is a complete sequence of packets that was exchanged during this diagnostic message transmission. """ return self.__packets_records @packets_records.setter def packets_records(self, value: PacketsRecordsSequence): """ Assign records value of UDS Packets that carried this diagnostic message . Provided :ref:`UDS packets <knowledge-base-uds-packet>` sequence must be a complete sequence of packets that was exchanged during this diagnostic message transmission. Sequence must not contain any packets that are unrelated to transmission of this message. :param value: UDS Packet Records sequence value to set. :raise ReassignmentError: There is a call to change the value after the initial assignment (in __init__). """ try: getattr(self, "_UdsMessageRecord__packets_records") except AttributeError: self.__validate_packets_records(value) self.__packets_records = tuple(value) else: raise ReassignmentError("You cannot change value of 'packets_records' attribute once it is assigned.") @property def payload(self) -> RawBytesTupleAlias: """Raw payload bytes carried by this diagnostic message.""" number_of_bytes = self.packets_records[0].data_length message_payload: RawBytesListAlias = [] for packet in self.packets_records: if packet.payload is not None: message_payload.extend(packet.payload) return tuple(message_payload[:number_of_bytes]) @property def addressing_type(self) -> AddressingType: """Addressing which was used to transmit this diagnostic message.""" return self.packets_records[0].addressing_type @property def direction(self) -> TransmissionDirection: """Information whether this message was received or sent.""" return self.packets_records[0].direction @property # noqa: F841 def transmission_start(self) -> datetime: """ Time stamp when transmission of this message was initiated. It is determined by a moment of time when the first packet (that carried this message) was published to a bus (either received or transmitted). :return: Time stamp when transmission of this message was initiated. """ return self.packets_records[0].transmission_time @property # noqa: F841 def transmission_end(self) -> datetime: """ Time stamp when transmission of this message was completed. It is determined by a moment of time when the last packet (that carried this message) was published to a bus (either received or transmitted). :return: Time stamp when transmission of this message was completed. """ return self.packets_records[-1].transmission_time