"""Implementation of diagnostic services data encoding and decoding."""
__all__ = ["Service", "DecodedMessageAlias", "DataRecordsValuesAlias",
"DataRecordValueAlias", "MultipleDataRecordValueAlias", "SingleDataRecordValueAlias"]
from copy import deepcopy
from typing import Collection, Dict, List, Mapping, Optional, Sequence, Set, Tuple, Union
from warnings import warn
from uds.message import NRC, RESPONSE_REQUEST_SID_DIFF, RequestSID, ResponseSID
from uds.utilities import Endianness, InconsistencyError, RawBytesAlias, bytes_to_int, int_to_bytes, validate_raw_bytes
from .data_record import (
AbstractConditionalDataRecord,
AbstractDataRecord,
AliasMessageStructure,
ChildrenValuesAlias,
DataRecordInfoAlias,
SingleOccurrenceInfo,
)
SingleDataRecordValueAlias = Optional[Union[int, ChildrenValuesAlias]]
"""Alias for a single occurrence Data Record. Either:
- int type - a single raw value
- mapping type - children values
- None - no occurrence"""
MultipleDataRecordValueAlias = Sequence[Union[int, ChildrenValuesAlias]]
"""Alias for a multiple occurrences Data Record. It is a sequence where each element represents a single occurrence.
Each element is either raw value (int type) or children values (mapping type)."""
DataRecordValueAlias = Union[SingleDataRecordValueAlias, MultipleDataRecordValueAlias]
"""Alias for a Data Record value that can be used in the Data Records Mapping."""
DataRecordsValuesAlias = Mapping[str, DataRecordValueAlias]
"""Alias for Data Records values mapping.
Mapping keys are Data Records names.
Mapping values are corresponding Data Records values.
"""
DecodedMessageAlias = Tuple[DataRecordInfoAlias, ...]
"""Alias for decoded information about a Diagnostic Message."""
[docs]
class Service:
"""
Translator for a diagnostic service.
Interactions via UDS protocol with servers (ECUs) are possible via
:ref:`diagnostic services <knowledge-base-service>` which are basically functions that you can request as a client.
Features:
- contains structures of diagnostic messages (both request and response) for a single diagnostic service
- provides tools for decoding meaningful information (physical values) from diagnostic messages
- provides tools for creating diagnostic messages out of meaningful information (physical values)
"""
NEGATIVE_RESPONSE_LENGTH = 3
def __init__(self,
request_sid: RequestSID,
request_structure: AliasMessageStructure,
response_structure: AliasMessageStructure,
supported_nrc: Collection[NRC] = tuple(NRC)) -> None:
"""
Define a translator for a single diagnostic service.
:param request_sid: Service Identifier for request message.
:param request_structure: Data Records that contains translation for response message continuation.
:param response_structure: Data Records that contains translation for diagnostic message continuation.
:param supported_nrc: NRC codes that are supported by this service.
.. warning:: Arguments `request_structure` and `response_structure` must not contain Data Records for the first
byte of respectively request message (SID) and response message (RSID) as those values are passed via
other parameters.
"""
self.request_sid = request_sid
self.request_structure = request_structure
self.response_structure = response_structure
self.supported_nrc = supported_nrc
@property
def request_sid(self) -> RequestSID:
"""Get Service Identifier (SID) value for this diagnostic service."""
return self.__request_sid
@request_sid.setter
def request_sid(self, request_sid: RequestSID) -> None:
"""
Set Service Identifier (SID) value for this diagnostic service.
:param request_sid: SID value to set.
:raise ValueError: Request SID and Response SID values are incorrectly defined for given value.
"""
self.__request_sid: RequestSID = RequestSID.validate_member(request_sid)
self.__response_sid: ResponseSID = ResponseSID.validate_member(request_sid + RESPONSE_REQUEST_SID_DIFF)
if self.__request_sid.name != self.__response_sid.name:
raise ValueError("Request and Response SID values are not defined for the same Service.")
@property
def response_sid(self) -> ResponseSID:
"""Get Response Service Identifier (RSID) value for this diagnostic service."""
return self.__response_sid
@property
def request_structure(self) -> AliasMessageStructure:
"""Get Data Records used for translating request messages for this diagnostic service."""
return self.__request_structure
@request_structure.setter
def request_structure(self, request_structure: AliasMessageStructure) -> None:
"""
Set Data Records to use for translating request messages for this diagnostic service.
:param request_structure: Data Records sequence to set.
"""
self.validate_message_structure(request_structure)
self.__request_structure = tuple(request_structure)
@property
def response_structure(self) -> AliasMessageStructure:
"""Get Data Records used for translating positive response messages for this diagnostic service."""
return self.__response_structure
@response_structure.setter
def response_structure(self, response_structure: AliasMessageStructure) -> None:
"""
Set Data Records used for translating positive response messages for this diagnostic service.
:param response_structure: Data Records sequence to set.
"""
self.validate_message_structure(response_structure)
self.__response_structure = tuple(response_structure)
@property
def supported_nrc(self) -> Set[NRC]:
"""Get NRC codes that are supported by this diagnostic service."""
return self.__supported_nrc
@supported_nrc.setter
def supported_nrc(self, nrc_container: Collection[NRC]) -> None:
"""
Set NRC codes that are supported by this diagnostic service.
:param nrc_container: NRC codes to set as supported.
"""
for nrc in nrc_container:
NRC.validate_member(nrc)
self.__supported_nrc = set(nrc_container)
@property
def name(self) -> str:
"""Get name of this service."""
return self.request_sid.name # type: ignore
[docs]
def _get_rsid_info(self, positive: bool = True) -> SingleOccurrenceInfo:
"""
Get detailed information about Response Service Identifier.
:param positive: RSID is for positive or negative response message.
:return: Detailed information about RSID value.
"""
rsid = self.response_sid if positive else ResponseSID.NegativeResponse
return SingleOccurrenceInfo(name="RSID",
length=8,
raw_value=rsid.value,
physical_value=rsid.name,
children=tuple(),
unit=None)
[docs]
def _get_sid_info(self) -> SingleOccurrenceInfo:
"""Get detailed information about Service Identifier."""
return SingleOccurrenceInfo(name="SID",
length=8,
raw_value=self.request_sid.value,
physical_value=self.request_sid.name,
children=tuple(),
unit=None)
[docs]
@staticmethod
def _get_nrc_info(nrc: NRC) -> SingleOccurrenceInfo:
"""
Get detailed information about Negative Response Code.
:param nrc: The value of NRC.
:return: Detailed information for single occurrence of NRC Data Record.
"""
nrc = NRC.validate_member(nrc)
return SingleOccurrenceInfo(name="NRC",
length=8,
raw_value=nrc.value,
physical_value=nrc.name,
children=tuple(),
unit=None)
[docs]
@staticmethod
def _get_single_data_record_occurrence(data_record: AbstractDataRecord,
value: SingleDataRecordValueAlias) -> List[int]:
"""
Get occurrence value for a single occurrence Data Record.
:param data_record: Data Record object.
:param value: Data Record value. Either:
- None - no value (valid for Data Records with min_occurrences=0)
- int type - raw value
- mapping type - children values
:raise TypeError: Provided value has incorrect type that cannot be handled for the provided Data Record.
:raise ValueError: Provided value is incorrect.
:return: List with either 1 or 0 raw values for this Data Record.
"""
if value is None and data_record.min_occurrences == 0:
return []
if isinstance(value, int):
if not data_record.min_raw_value <= value <= data_record.max_raw_value:
raise ValueError("Provided occurrence value is out of range. "
f"Data Record name = {data_record.name!r}. "
f"Data Record min raw value = {data_record.min_raw_value}. "
f"Data Record max raw value = {data_record.max_raw_value}. "
f"Provided sequence = {value}. Occurrence value = {value}.")
return [value]
if isinstance(value, Mapping):
return [data_record.get_raw_value_from_children(value)]
raise TypeError(f"Incorrect value was provided for a Single Occurrence Data Record. "
f"Data Record name = {data_record.name!r}. Provided value = {value}.")
[docs]
@staticmethod
def _get_reoccurring_data_record_occurrences(data_record: AbstractDataRecord,
value: MultipleDataRecordValueAlias) -> List[int]:
"""
Get occurrences values for multiple occurrences Data Record.
:param data_record: Data Record object.
:param value: Sequence with Data Record values (either int or mapping type).
:raise TypeError: Provided value has incorrect type that cannot be handled for the provided Data Record.
:raise ValueError: Provided value is incorrect.
:return: List with raw values for this Data Record.
"""
if not isinstance(value, Sequence):
raise TypeError("A sequence of values has to be provided for a reoccurring Data Record. "
f"Data Record name = {data_record.name!r}.")
if len(value) < data_record.min_occurrences or len(value) > (data_record.max_occurrences or float("inf")):
raise ValueError("A sequence of values has to contain proper number of Data Record occurrences."
f"Data Record name = {data_record.name!r}. "
f"Data Record min occurrences number = {data_record.min_occurrences}. "
f"Data Record max occurrences number = {data_record.max_occurrences}. "
f"Provided sequence = {value}.")
raw_values: List[int] = []
for occurrence_value in value:
if isinstance(occurrence_value, int):
if not data_record.min_raw_value <= occurrence_value <= data_record.max_raw_value:
raise ValueError("Provided occurrence value is out of range. "
f"Data Record name = {data_record.name!r}. "
f"Data Record min raw value = {data_record.min_raw_value}. "
f"Data Record max raw value = {data_record.max_raw_value}. "
f"Provided sequence = {value}. Occurrence value = {occurrence_value}.")
raw_values.append(occurrence_value)
elif isinstance(occurrence_value, Mapping):
raw_values.append(data_record.get_raw_value_from_children(occurrence_value))
else:
raise ValueError("Incorrect value was provided for at least one occurrence of a Multi Occurrences "
f"Data Record. Data Record name = {data_record.name!r}. "
f"Provided values = {value}. Incorrect occurrence = {occurrence_value}.")
return raw_values
[docs]
@classmethod
def _get_data_record_occurrences(cls,
data_record: AbstractDataRecord,
value: DataRecordValueAlias) -> List[int]:
"""
Get raw values of all occurrences provided as value.
:param data_record: Data Record object.
:param value: Data Record values. Either for a single occurrence or multiple occurrences.
Each occurrence might be a raw value or mapping with children values.
:return: Raw values for following Data Record occurrences.
"""
if data_record.is_reoccurring:
return cls._get_reoccurring_data_record_occurrences(data_record=data_record, value=value) # type: ignore
return cls._get_single_data_record_occurrence(data_record=data_record, value=value) # type: ignore
[docs]
@staticmethod
def _get_remaining_length(message_structure: AliasMessageStructure) -> int:
"""
Get minimal remaining length for the provided message structure.
:param message_structure: Message structure to check.
:raise TypeError: All elements of the message structure must be instances of `AbstractDataRecord` class.
:return: Minimal length of the provided messages structure.
"""
min_length = 0
for data_record in message_structure:
if isinstance(data_record, AbstractDataRecord):
min_length += data_record.length * data_record.min_occurrences
if data_record.min_occurrences == 0 and not data_record.is_reoccurring:
break
else:
raise TypeError("Minimal length can only be assessed for instances of `AbstractDataRecord` class.")
return min_length
[docs]
@classmethod
def _decode_payload(cls, # pylint: disable=too-many-branches
payload: RawBytesAlias,
message_structure: AliasMessageStructure,
check_remaining_length: bool = True) -> DecodedMessageAlias:
"""
Decode information for given message structure and payload.
:param payload: Payload to decode.
:param message_structure: Defined structure of a diagnostic message.
:param check_remaining_length: Whether to raise an exception when only part of the message was decoded.
:raise ValueError: Provided message payload was too short.
:raise RuntimeError: An error occurred which was caused by incorrect message structure.
:raise NotImplementedError: There is missing implementation for at least one Data Record in the provided
message structure.
:return: Decoded information from the provided payload.
"""
decoded_message_continuation = []
remaining_length = 8 * len(payload)
payload_int = bytes_to_int(bytes_list=payload, endianness=Endianness.BIG_ENDIAN) if payload else 0
raw_values: List[int] = []
for i, data_record in enumerate(message_structure):
if isinstance(data_record, AbstractDataRecord):
if data_record.is_reoccurring and not data_record.fixed_total_length:
try:
additional_required_length = cls._get_remaining_length(message_structure[i + 1:])
except TypeError:
additional_required_length = 0
max_occurrences_number = (remaining_length - additional_required_length) // data_record.length
else:
max_occurrences_number = remaining_length // data_record.length
occurrences_number = int(min(max_occurrences_number, data_record.max_occurrences or float("inf")))
if occurrences_number < data_record.min_occurrences:
raise ValueError("Too short payload was provided.")
raw_values = []
for _ in range(occurrences_number):
remaining_length -= data_record.length
mask = (1 << data_record.length) - 1
occurrence_value = (payload_int >> remaining_length) & mask
raw_values.append(occurrence_value)
if data_record.min_occurrences == 0 and not raw_values:
if remaining_length == 0:
break
else:
decoded_message_continuation.append(data_record.get_occurrence_info(*raw_values))
elif isinstance(data_record, AbstractConditionalDataRecord):
if remaining_length % 8 != 0:
raise RuntimeError("Incorrect Data Records structure.")
bytes_number = remaining_length // 8
conditional_message_continuation = data_record.get_message_continuation(raw_value=raw_values[-1])
remaining_payload = int_to_bytes(int_value=payload_int & ((1 << remaining_length) - 1),
endianness=Endianness.BIG_ENDIAN,
size=bytes_number)
decoded_conditional_message_continuation = cls._decode_payload(
payload=remaining_payload,
message_structure=conditional_message_continuation,
check_remaining_length=False)
for data_record_info in decoded_conditional_message_continuation:
occurrences_number = 1 if isinstance(data_record_info["raw_value"], int) \
else len(data_record_info["raw_value"])
remaining_length -= occurrences_number * data_record_info["length"]
decoded_message_continuation.append(data_record_info)
raw_values.append(data_record_info["raw_value"]
if isinstance(data_record_info["raw_value"], int) else
data_record_info["raw_value"][-1])
else:
raise NotImplementedError("Unexpected Data Record type found in the structure.")
if check_remaining_length and remaining_length != 0:
raise RuntimeError("Incorrect message structure was defined.")
return tuple(decoded_message_continuation)
[docs]
@classmethod
def _encode_message(cls, # pylint: disable=too-many-branches
data_records_values: Dict[str, DataRecordValueAlias],
message_structure: AliasMessageStructure,
check_unused_data_record_values: bool = True) -> bytearray:
"""
Encode payload of a diagnostic message.
:param data_records_values: Mapping with Data Records values that are part of the message.
Mapping keys are Data Records names.
Mapping values are either a single occurrence or multiple occurrences values. Each occurrence can be
a raw value or a mapping with children names and its corresponding values.
:param message_structure: Data Records that form the remaining structure of the diagnostic message.
:param check_unused_data_record_values: Whether to raise an exception when unused Data Record value found.
:raise RuntimeError: An error occurred which was caused by incorrect message structure.
:raise ValueError: Value for at least one Data Record that is no part of the message, was provided.
:raise NotImplementedError: There is missing implementation for at least one Data Record in the provided
message structure.
:return: Payload of a diagnostic message created from provided data records values.
"""
total_raw_value = 0
total_length = 0
for data_record in message_structure:
if isinstance(data_record, AbstractDataRecord):
if data_record.name in data_records_values:
data_record_value = data_records_values.pop(data_record.name)
occurrences = cls._get_data_record_occurrences(data_record=data_record,
value=data_record_value)
elif data_record.min_occurrences == 0:
occurrences = []
else:
raise InconsistencyError(f"Value for Data Record {data_record.name!r} was not provided.")
if len(occurrences) == 0 and not data_records_values:
break
for raw_value in occurrences:
total_raw_value = (total_raw_value << data_record.length) + raw_value
total_length += data_record.length
elif isinstance(data_record, AbstractConditionalDataRecord):
message_continuation = data_record.get_message_continuation(raw_value=raw_value)
if message_continuation:
payload_continuation = cls._encode_message(data_records_values=data_records_values,
message_structure=message_continuation,
check_unused_data_record_values=False)
_length = 8 * len(payload_continuation)
total_length += _length
continuation_raw_value = bytes_to_int(payload_continuation, endianness=Endianness.BIG_ENDIAN)
total_raw_value = (total_raw_value << _length) + continuation_raw_value
# calculate raw_value of the last Data Record (in the message_continuation)
# in case it is followed by another ConditionalDataRecord
if isinstance(message_continuation[-1], AbstractDataRecord):
last_data_record_mask = (1 << message_continuation[-1].length) - 1
raw_value = continuation_raw_value & last_data_record_mask
else:
raise NotImplementedError("Unexpected Data Record type found in the structure.")
if total_length % 8 != 0:
raise RuntimeError("Incorrect message structure was provided.")
if check_unused_data_record_values and data_records_values:
raise ValueError(f"Unused Data Record values were provided: {data_records_values}.")
return bytearray(int_to_bytes(int_value=total_raw_value,
size=total_length // 8,
endianness=Endianness.BIG_ENDIAN))
[docs]
@staticmethod
def validate_message_structure(value: AliasMessageStructure) -> None:
"""
Validate whether the provided value is a structure of diagnostic message.
:param value: Value to check.
"""
AbstractConditionalDataRecord.validate_message_continuation(value)
[docs]
def decode_request(self, payload: RawBytesAlias) -> DecodedMessageAlias:
"""
Decode information carried by a request message for this diagnostic service.
:param payload: Payload of a request message.
:raise ValueError: Provided payload does not carry a request to this diagnostic service.
:return: Decoded information from the provided payload.
"""
validate_raw_bytes(payload, allow_empty=False)
if payload[0] != self.request_sid:
raise ValueError("Provided payload does not start from SID value for this service.")
decoded_message_continuation = self._decode_payload(payload=payload[1:],
message_structure=self.request_structure)
return self._get_sid_info(), *decoded_message_continuation
[docs]
def decode_positive_response(self, payload: RawBytesAlias) -> DecodedMessageAlias:
"""
Decode information carried by a positive response message for this diagnostic service.
:param payload: Payload of a positive response message.
:raise ValueError: Provided payload does not carry a positive response to this diagnostic service.
:return: Decoded information from the provided payload.
"""
validate_raw_bytes(payload, allow_empty=False)
if payload[0] != self.response_sid:
raise ValueError("Provided payload does not start from RSID value for this service.")
decoded_message_continuation = self._decode_payload(payload=payload[1:],
message_structure=self.response_structure)
return self._get_rsid_info(), *decoded_message_continuation
[docs]
def decode_negative_response(self, payload: RawBytesAlias) -> DecodedMessageAlias:
"""
Decode information carried by a negative response message for this diagnostic service.
:param payload: Payload of a negative response message.
:raise ValueError: Provided payload does not carry a negative response to this diagnostic service.
:return: Decoded information from the provided payload.
"""
validate_raw_bytes(payload, allow_empty=False)
if len(payload) != self.NEGATIVE_RESPONSE_LENGTH:
raise ValueError(f"Negative Response Message must be exactly {self.NEGATIVE_RESPONSE_LENGTH}-bytes long.")
rsid = payload[0]
sid = payload[1]
nrc = payload[2]
if rsid != ResponseSID.NegativeResponse:
raise ValueError("Provided payload does not start from Negative Response SID value.")
if sid != self.request_sid:
raise ValueError(f"Provided payload contains Negative Response for another service with SID=0x{sid:02X}.")
if nrc not in self.supported_nrc:
warn(message=f"Received NRC code `0x{nrc:02X}` that is not supported by {self.name!r} service.",
category=UserWarning)
return self._get_rsid_info(positive=False), self._get_sid_info(), self._get_nrc_info(NRC(nrc))
[docs]
def decode(self, payload: RawBytesAlias) -> DecodedMessageAlias:
"""
Decode information carried by a diagnostic message for this diagnostic service.
:param payload: Payload of a diagnostic message.
:raise ValueError: Provided message payload does not start from a SID value for this service.
:return: Decoded information from the provided payload.
"""
if payload[0] == self.request_sid:
return self.decode_request(payload)
if payload[0] == self.response_sid:
return self.decode_positive_response(payload)
if payload[0] == ResponseSID.NegativeResponse:
return self.decode_negative_response(payload)
raise ValueError("Provided message does not belong to this diagnostic message")
[docs]
def encode_request(self, data_records_values: DataRecordsValuesAlias) -> bytearray:
"""
Encode request message payload for this service.
:param data_records_values: Mapping with Data Records values that are part of the message.
Mapping keys are Data Records names.
Mapping values are either a single occurrence or multiple occurrences values. Each occurrence can be
a raw value or a mapping with children names and its corresponding values.
:return: Payload of a request message.
"""
return (bytearray([self.request_sid])
+ self._encode_message(data_records_values=deepcopy(dict(data_records_values)),
message_structure=self.request_structure))
[docs]
def encode_positive_response(self, data_records_values: DataRecordsValuesAlias) -> bytearray:
"""
Encode positive response message payload for this service.
:param data_records_values: Mapping with Data Records values that are part of the message.
Mapping keys are Data Records names.
Mapping values are either a single occurrence or multiple occurrences values. Each occurrence can be
a raw value or a mapping with children names and its corresponding values.
:return: Payload of a positive response message.
"""
return (bytearray([self.response_sid])
+ self._encode_message(data_records_values=deepcopy(dict(data_records_values)),
message_structure=self.response_structure))
[docs]
def encode_negative_response(self, nrc: NRC) -> bytearray:
"""
Encode negative response message payload for this service.
:param nrc: NRC value to use.
:return: Payload of a negative response message for this service.
"""
NRC.validate_member(nrc)
if nrc not in self.supported_nrc:
warn(message=f"NRC code {nrc} is not supported by service {self.name!r}.",
category=UserWarning)
return bytearray([ResponseSID.NegativeResponse, self.request_sid, nrc])
[docs]
def encode(self,
data_records_values: DataRecordsValuesAlias,
sid: Optional[RequestSID] = None,
rsid: Optional[ResponseSID] = None) -> bytearray:
"""
Encode diagnostic message payload for this service.
:param data_records_values: Mapping with Data Records values that are part of the message.
Mapping keys are Data Records names.
Mapping values are either a single occurrence or multiple occurrences values. Each occurrence can be
a raw value or a mapping with children names and its corresponding values.
:param sid: Request SID value.
Used by request message (first byte) and negative response message (second byte).
:param rsid: Response SID value.
Used by response messages only (first byte).
:raise ValueError: Missing or provided SID/RSID value cannot be handled by this service.
:raise InconsistencyError: Value only for `NRC`
:return: Payload of a diagnostic message created from provided data records values.
"""
if rsid == ResponseSID.NegativeResponse and sid in {None, self.request_sid}:
if set(data_records_values.keys()) != {"NRC"}:
raise InconsistencyError("Value only for `NRC` Data Record shall be provided in case of "
"negative response message. "
f"Actual values: {data_records_values}")
return self.encode_negative_response(nrc=data_records_values["NRC"]) # type: ignore
if rsid == self.response_sid and sid is None:
return self.encode_positive_response(data_records_values=data_records_values)
if sid == self.request_sid and rsid is None:
return self.encode_request(data_records_values=data_records_values)
raise ValueError("Either SID or RSID value is missing or incorrect. Provided values: "
f"SID = {sid}. RSID = {rsid}.")