Source code for uds.client

"""Implementation for :ref:`UDS Client <knowledge-base-client>` Simulation."""

__all__ = ["Client"]

from functools import wraps
from queue import Empty, SimpleQueue
from threading import Event, Thread
from time import perf_counter, time
from typing import Any, Callable, List, Optional, Sequence, Tuple, Union
from warnings import warn

from uds.addressing import AddressingType
from uds.message import NRC, RESPONSE_REQUEST_SID_DIFF, RequestSID, ResponseSID, UdsMessage, UdsMessageRecord
from uds.translator import TESTER_PRESENT
from uds.transport_interface import AbstractTransportInterface
from uds.utilities import (
    InconsistencyError,
    MessageTransmissionNotStartedError,
    ReassignmentError,
    TimeMillisecondsAlias,
    ValueWarning,
    bytes_to_hex,
)


def decorator_block_receiving(method: Callable) -> Callable:  # type: ignore
    """
    Decorate a method that blocks receiving task.

    :param method: Method to decorate.

    :return: Decorated method.
    """
    @wraps(method)
    def wrapper(self, *args: Any, **kwargs: Any) -> Any:  # type: ignore
        # pylint: disable=protected-access
        self._Client__receiving_break_event.set()
        self._Client__receiving_not_in_progress.wait(timeout=self.p6_ext_client_timeout)
        try:
            return_value = method(self, *args, **kwargs)
        except Exception as error:
            self._Client__receiving_break_event.clear()
            raise error
        self._Client__receiving_break_event.clear()
        return return_value
    return wrapper


[docs] class Client: """Simulation for UDS Client entity.""" DEFAULT_P2_CLIENT_TIMEOUT: TimeMillisecondsAlias = 100 # P2Client_max > P2Server_max (default: 50 ms) """Default value of :ref:`P2Client <knowledge-base-p2-client>` timeout.""" DEFAULT_P6_CLIENT_TIMEOUT: TimeMillisecondsAlias = 10000 # P6Client_max > P2Client_max """Default value of :ref:`P6Client <knowledge-base-p6-client>` timeout.""" DEFAULT_P2_EXT_CLIENT_TIMEOUT: TimeMillisecondsAlias = 5050 # P2*Client_max > P2*Server_max (default: 5000 ms) """Default value of :ref:`P2*Client <knowledge-base-p2*-client>` timeout.""" DEFAULT_P6_EXT_CLIENT_TIMEOUT: TimeMillisecondsAlias = 50000 # P6*Client_max > P2*Client_max """Default value of :ref:`P6*Client <knowledge-base-p6*-client>` timeout.""" DEFAULT_S3_CLIENT: TimeMillisecondsAlias = 2000 """Default value of :ref:`S3Client <knowledge-base-s3-client>` time parameter.""" DEFAULT_RECEIVING_TASK_CYCLE: TimeMillisecondsAlias = 20 """Default value of receiving task cycle.""" def __init__(self, transport_interface: AbstractTransportInterface, p2_client_timeout: TimeMillisecondsAlias = DEFAULT_P2_CLIENT_TIMEOUT, p2_ext_client_timeout: TimeMillisecondsAlias = DEFAULT_P2_EXT_CLIENT_TIMEOUT, p6_client_timeout: TimeMillisecondsAlias = DEFAULT_P6_CLIENT_TIMEOUT, p6_ext_client_timeout: TimeMillisecondsAlias = DEFAULT_P6_EXT_CLIENT_TIMEOUT, s3_client: TimeMillisecondsAlias = DEFAULT_S3_CLIENT) -> None: """ Configure Client for UDS communication. :param transport_interface: Transport Interface object for managing UDS communication. :param p2_client_timeout: Timeout value for P2Client parameter. :param p2_ext_client_timeout: Timeout value for P2*Client parameter. :param p6_client_timeout: Timeout value for P6Client parameter. :param p6_ext_client_timeout: Timeout value for P*Client parameter. :param s3_client: Value of S3Client time parameter. """ self.transport_interface = transport_interface self.p2_client_timeout = p2_client_timeout self.p2_ext_client_timeout = p2_ext_client_timeout self.p6_client_timeout = p6_client_timeout self.p6_ext_client_timeout = p6_ext_client_timeout self.s3_client = s3_client self.__p2_client_measured: Optional[TimeMillisecondsAlias] = None self.__p2_ext_client_measured: Optional[Tuple[TimeMillisecondsAlias, ...]] = None self.__p6_client_measured: Optional[TimeMillisecondsAlias] = None self.__p6_ext_client_measured: Optional[TimeMillisecondsAlias] = None self.__response_queue: SimpleQueue[UdsMessageRecord] = SimpleQueue() self.__receiving_thread: Optional[Thread] = None self.__receiving_stop_event: Event = Event() self.__receiving_break_event: Event = Event() self.__receiving_not_in_progress: Event = Event() self.__tester_present_thread: Optional[Thread] = None self.__tester_present_stop_event: Event = Event() self.__receiving_stop_event.set() self.__receiving_break_event.clear() self.__receiving_not_in_progress.set() self.__tester_present_stop_event.set()
[docs] def __del__(self) -> None: """Safely finish all tasks.""" if self.is_tester_present_sent: self.stop_tester_present() if self.is_receiving: self.stop_receiving()
@property def transport_interface(self) -> AbstractTransportInterface: """Get Transport Interface used.""" return self.__transport_interface @transport_interface.setter def transport_interface(self, value: AbstractTransportInterface) -> None: """ Set Transport Interface for UDS communication. :param value: Value to set. :raise TypeError: Provided value is not an instance of AbstractTransportInterface class. :raise ReassignmentError: An attempt to change the value after object creation. """ if not isinstance(value, AbstractTransportInterface): raise TypeError("Provided value is not an instance of AbstractTransportInterface class.") if hasattr(self, "_Client__transport_interface"): raise ReassignmentError("Value of 'transport_interface' attribute cannot be changed once assigned.") self.__transport_interface = value @property def p2_client_timeout(self) -> TimeMillisecondsAlias: """Get timeout value for :ref:`P2Client <knowledge-base-p2-client>` parameter.""" return self.__p2_client_timeout @p2_client_timeout.setter def p2_client_timeout(self, value: TimeMillisecondsAlias) -> None: """ Set timeout value for P2Client parameter. :param value: Value to set. :raise TypeError: Provided value is not int or float type. :raise ValueError: Provided time value must be a positive number. """ if not isinstance(value, (int, float)): raise TypeError("Provided time parameter value must be int or float type.") if value <= 0: raise ValueError("Provided timeout parameter value must be greater than 0.") self.__p2_client_timeout = value @property # noqa: vulture def p2_client_measured(self) -> Optional[TimeMillisecondsAlias]: """ Get last measured value of P2Client parameter. :return: The last measured value or None if measurement was not performed. """ return self.__p2_client_measured @property def p2_ext_client_timeout(self) -> TimeMillisecondsAlias: """Get timeout value for :ref:`P2*Client <knowledge-base-p2*-client>` parameter.""" return self.__p2_ext_client_timeout @p2_ext_client_timeout.setter def p2_ext_client_timeout(self, value: TimeMillisecondsAlias) -> None: """ Set timeout value for P2*Client parameter. :param value: value to set. :raise TypeError: Provided value is not int or float type. :raise ValueError: Provided time value must be a positive number. """ if not isinstance(value, (int, float)): raise TypeError("Provided time parameter value must be int or float type.") if value <= 0: raise ValueError("Provided timeout parameter value must be greater than 0.") self.__p2_ext_client_timeout = value @property # noqa: vulture def p2_ext_client_measured(self) -> Optional[Tuple[TimeMillisecondsAlias, ...]]: """ Get last measured values of P2*Client parameter. :return: The last measured values or None if measurement was not performed. """ return self.__p2_ext_client_measured @property def p6_client_timeout(self) -> TimeMillisecondsAlias: """Get timeout value for :ref:`P6Client <knowledge-base-p6-client>` parameter.""" return self.__p6_client_timeout @p6_client_timeout.setter def p6_client_timeout(self, value: TimeMillisecondsAlias) -> None: """ Set timeout value for P6Client parameter. :param value: Value to set. :raise TypeError: Provided value is not int or float type. :raise ValueError: Provided time value must be a positive number. :raise InconsistencyError: P6Client timeout value must be greater or equal than P2Client timeout. """ if not isinstance(value, (int, float)): raise TypeError("Provided time parameter value must be int or float type.") if value <= 0: raise ValueError("Provided timeout parameter value must be greater than 0.") if value < self.p2_client_timeout: raise InconsistencyError("P6Client timeout value must be greater or equal than " f"P2Client timeout ({self.p2_client_timeout} ms).") self.__p6_client_timeout = value @property # noqa: vulture def p6_client_measured(self) -> Optional[TimeMillisecondsAlias]: """ Get last measured value of P6Client parameter. :return: The last measured value or None if measurement was not performed. """ return self.__p6_client_measured @property def p6_ext_client_timeout(self) -> TimeMillisecondsAlias: """Get timeout value for :ref:`P6*Client <knowledge-base-p6*-client>` parameter.""" return self.__p6_ext_client_timeout @p6_ext_client_timeout.setter def p6_ext_client_timeout(self, value: TimeMillisecondsAlias) -> None: """ Set timeout value for P6*Client parameter. :param value: value to set. :raise TypeError: Provided value is not int or float type. :raise ValueError: Provided time value must be a positive number. :raise InconsistencyError: P6*Client timeout value must be greater or equal than P2*Client timeout and P6Client timeout. """ if not isinstance(value, (int, float)): raise TypeError("Provided time parameter value must be int or float type.") if value <= 0: raise ValueError("Provided timeout parameter value must be greater than 0.") if value < self.p6_client_timeout or value < self.p2_ext_client_timeout: raise InconsistencyError("P6*Client timeout value must be greater or equal than " f"P2*Client timeout ({self.p2_ext_client_timeout} ms) and " f"P6Client timeout ({self.p6_client_timeout} ms).") self.__p6_ext_client_timeout = value @property # noqa: vulture def p6_ext_client_measured(self) -> Optional[TimeMillisecondsAlias]: """ Get last measured value of P6*Client parameter. :return: The last measured value or None if measurement was not performed. """ return self.__p6_ext_client_measured @property def s3_client(self) -> TimeMillisecondsAlias: """Get value of :ref:`S3Client <knowledge-base-s3-client>` parameter.""" return self.__s3_client @s3_client.setter def s3_client(self, value: TimeMillisecondsAlias) -> None: """ Set value of S3Client parameter. :param value: value to set. :raise TypeError: Provided value is not int or float type. :raise ValueError: Provided time value must be a positive number. :raise InconsistencyError: S3Client value must be greater or equal than P6Client timeout. """ if not isinstance(value, (int, float)): raise TypeError("Provided time parameter value must be int or float type.") if value <= 0: raise ValueError("Provided timeout parameter value must be greater than 0.") if value < self.p2_client_timeout: raise InconsistencyError("S3Client value must be greater or equal than " f"P2Client timeout ({self.p2_client_timeout} ms).") self.__s3_client = value @property def is_receiving(self) -> bool: """Get flag whether receiving thread is running.""" return self.__receiving_thread is not None @property def is_tester_present_sent(self) -> bool: """Get flag whether Tester Present thread is running periodic sending.""" return self.__tester_present_thread is not None
[docs] def _update_p2_client_measured(self, value: TimeMillisecondsAlias) -> None: """ Update measured values of P2Client parameter. :param value: Value to set. :raise TypeError: Provided value is not int or float type. :raise ValueError: Provided time value must be a positive number. """ if not isinstance(value, (int, float)): raise TypeError("Provided value is not int or float type.") if value <= 0: raise ValueError("P2Client parameter value must be a positive number.") if value > self.p2_client_timeout: warn("Measured value of P2Client was greater than P2Client timeout.", category=ValueWarning) self.__p2_client_measured = value
[docs] def _update_p2_ext_client_measured(self, *values: TimeMillisecondsAlias) -> None: """ Update measured values of P2*Client parameter. :param values: Values to set. :raise RuntimeError: At least one P2*Client value must be provided. :raise TypeError: One of provided values is not int or float type. :raise ValueError: One of provided values is out of range. """ if len(values) == 0: raise RuntimeError("At least one P2*Client value must be provided.") for value in values: if not isinstance(value, (int, float)): raise TypeError("One of provided values is not int or float type.") if value <= 0: raise ValueError("P2*Client parameter value must be a positive number.") if value > self.p2_ext_client_timeout: warn("Measured value of P2*Client was greater than P2*Client timeout.", category=ValueWarning) self.__p2_ext_client_measured = tuple(values)
[docs] def _update_p6_client_measured(self, value: TimeMillisecondsAlias) -> None: """ Update measured values of P6Client parameter. :param value: Value to set. :raise TypeError: Provided value is not int or float type. :raise ValueError: Provided time value must be a positive number. """ if not isinstance(value, (int, float)): raise TypeError("Provided value is not int or float type.") if value <= 0: raise ValueError("P6Client parameter value must be a positive number.") if value > self.p6_client_timeout: warn("Measured value of P6Client was greater than P6Client timeout.", category=ValueWarning) self.__p6_client_measured = value
[docs] def _update_p6_ext_client_measured(self, value: TimeMillisecondsAlias) -> None: """ Update measured values of P6*Client parameter. :param value: Value to set. :raise TypeError: Provided value is not int or float type. :raise ValueError: Provided time value must be a positive number. """ if not isinstance(value, (int, float)): raise TypeError("Provided value is not int or float type.") if value <= 0: raise ValueError("P6*Client parameter value must be a positive number.") if value > self.p6_ext_client_timeout: warn("Measured value of P6*Client was greater than P6*Client timeout.", category=ValueWarning) self.__p6_ext_client_measured = value
[docs] def _update_measured_client_values(self, request_record: UdsMessageRecord, response_records: Sequence[UdsMessageRecord]) -> None: """ Update measured timing parameters on Client side (P2Client, P2*Client, P6Client and P6*Client). :param request_record: Record of the last transmitted request message. :param response_records: Records of received responses to provided message. """ p2_measured = response_records[0].transmission_start - request_record.transmission_end if p2_measured.total_seconds() > 0: self._update_p2_client_measured(p2_measured.total_seconds() * 1000.) else: warn("Measured P2Client value is negative. Check Transport Interface accuracy.", category=RuntimeWarning) if len(response_records) > 1: p2_ext_measured_list = [] for i, response_record in enumerate(response_records[1:]): _p2_ext_measured = response_record.transmission_end - response_records[i].transmission_end p2_ext_measured_list.append(_p2_ext_measured.total_seconds() * 1000.) p6_ext_measured = response_records[-1].transmission_end - request_record.transmission_end self._update_p2_ext_client_measured(*p2_ext_measured_list) self._update_p6_ext_client_measured(p6_ext_measured.total_seconds() * 1000.) else: p6_measured = response_records[-1].transmission_end - request_record.transmission_end if p6_measured.total_seconds() > 0: self._update_p6_client_measured(p6_measured.total_seconds() * 1000.) else: warn("Measured P6Client value is negative. Check Transport Interface accuracy.", category=RuntimeWarning)
[docs] def _receive_response(self, sid: RequestSID, start_timeout: TimeMillisecondsAlias, end_timeout: TimeMillisecondsAlias) -> Optional[UdsMessageRecord]: """ Receive UDS response message to previously sent request. :param sid: SID of the last sent request message. :param start_timeout: Maximal time (in milliseconds) to wait. :return: Record with response message received to the last UDS request message sent. None if a timeout was reached. """ timestamp_start = perf_counter() remaining_start_timeout_ms = start_timeout # either P2Client or P2*Client remaining_end_timeout_ms = end_timeout # either P6Client or P6*Client while remaining_start_timeout_ms > 0 and remaining_end_timeout_ms > 0: # try to receive a message try: response_record = self.transport_interface.receive_message( start_timeout=min(remaining_start_timeout_ms, remaining_end_timeout_ms), end_timeout=remaining_end_timeout_ms) except MessageTransmissionNotStartedError: return None # positive response message received if response_record.payload[0] == sid + RESPONSE_REQUEST_SID_DIFF: return response_record # negative response message received if response_record.payload[0] == ResponseSID.NegativeResponse and response_record.payload[1] == sid: return response_record # other response message received self.__response_queue.put_nowait(response_record) # update time parameters time_elapsed_ms = (perf_counter() - timestamp_start) * 1000. remaining_start_timeout_ms = start_timeout - time_elapsed_ms remaining_end_timeout_ms = end_timeout - time_elapsed_ms return None
[docs] def _receive_task(self, cycle: TimeMillisecondsAlias) -> None: """ Schedule reception of a UDS message for a cyclic response collecting. :param cycle: Time (in milliseconds) used for this task cycle. """ while not self.__receiving_stop_event.is_set(): if self.__receiving_break_event.wait(cycle / 1000.): continue self.__receiving_not_in_progress.clear() try: response = self.transport_interface.receive_message(start_timeout=cycle, end_timeout=self.p6_ext_client_timeout) except TimeoutError: pass else: self.__response_queue.put_nowait(response) self.__receiving_not_in_progress.set()
[docs] def _send_tester_present_task(self, tester_present_message: UdsMessage) -> None: """ Schedule a single Tester Present message transmission for a cyclic sending. :param tester_present_message: Tester Present message to send. """ period = self.s3_client / 1000.0 next_call = perf_counter() while not self.__tester_present_stop_event.is_set(): self.transport_interface.send_message(tester_present_message) next_call += period remaining_wait = next_call - perf_counter() if self.__tester_present_stop_event.wait(remaining_wait): break
[docs] @staticmethod def is_response_pending_message(message: Union[UdsMessage, UdsMessageRecord], request_sid: RequestSID) -> bool: """ Check if provided UDS message record contains Negative Response with Response Pending NRC. :param message: UDS Message Record to check. :param request_sid: Request SID value sent in the proceeding UDS request message. :raise TypeError: Provided message value is not an instance of UdsMessageRecord class. :return: True if provided UDS message record contains Negative Response with Response Pending NRC, False otherwise. """ if not isinstance(message, (UdsMessage, UdsMessageRecord)): raise TypeError("Provided message value is not an instance of UdsMessageRecord class.") request_sid = RequestSID.validate_member(request_sid) if len(message.payload) != 3: return False return (message.payload[0] == ResponseSID.NegativeResponse and message.payload[1] == request_sid and message.payload[2] == NRC.RequestCorrectlyReceived_ResponsePending)
[docs] def get_response(self, timeout: Optional[TimeMillisecondsAlias] = None) -> Optional[UdsMessageRecord]: """ Wait for the first received response message. .. note:: This method can be used for fetching responses messages that were not direct responses to request messages sent via :meth:`~uds.client.Client.send_request_receive_responses`. This includes responses to cyclically sent Tester Present. Typically used for fetching following :ref:`Response on Event (RSID 0xC6) <knowledge-base-service-response-on-event>` responses. :param timeout: Maximal time to wait for a response message. Leave None to wait forever. :raise TypeError: Provided value is not int or float type. :raise ValueError: Provided value is out of range. :return: Record with the first response message received or None if no message was received. """ if timeout is not None: if not isinstance(timeout, (int, float)): raise TypeError("Timeout value must be None, int or float type.") if timeout <= 0: raise ValueError(f"Provided timeout value is less or equal to 0. Actual value: {timeout}") try: return self.__response_queue.get(timeout=None if timeout is None else timeout / 1000.) except Empty: return None
[docs] def get_response_no_wait(self) -> Optional[UdsMessageRecord]: """ Get the first received response message, but do not wait for its arrival. .. note:: This method can be used for fetching responses messages that were not direct responses to request messages sent via :meth:`~uds.client.Client.send_request_receive_responses`. This includes responses to cyclically sent Tester Present. Typically used for fetching following :ref:`Response on Event (RSID 0xC6) <knowledge-base-service-response-on-event>` responses. :return: Record with the first response message received or None if no message was received. """ try: return self.__response_queue.get_nowait() except Empty: return None
[docs] def clear_response_queue(self) -> None: """Clear all response messages that are currently stored in the queue.""" for _ in range(self.__response_queue.qsize()): self.__response_queue.get_nowait()
[docs] def start_receiving(self, cycle: TimeMillisecondsAlias = DEFAULT_RECEIVING_TASK_CYCLE) -> None: """ Start receiving task in the background. All response messages sent to this Client while receiving is active, will be collected and accessible via :meth:`~uds.client.Client.get_response` and :meth:`~uds.client.Client.get_response_no_wait` methods. .. warning:: Cycle value would be overwritten with default value if request message is sent while receiving. """ if self.is_receiving: warn("Receiving is already active.", category=UserWarning) else: self.__receiving_stop_event.clear() self.__receiving_thread = Thread(target=self._receive_task, kwargs={"cycle": cycle}, daemon=True) self.__receiving_thread.start()
[docs] def stop_receiving(self) -> None: """Stop receiving task.""" if self.is_receiving: self.__receiving_stop_event.set() self.__receiving_thread.join() # type: ignore self.__receiving_thread = None else: warn("Receiving is already stopped.", category=UserWarning)
[docs] def start_tester_present(self, addressing_type: AddressingType = AddressingType.FUNCTIONAL, sprmib: bool = True) -> None: """ Start sending Tester Present cyclically. :param addressing_type: Addressing Type to use for cyclical messages. :param sprmib: Whether to use Suppress Positive Response Message Indication Bit. """ if self.is_tester_present_sent: warn("Tester Present is already transmitted cyclically.", category=UserWarning) else: self.__tester_present_stop_event.clear() payload = TESTER_PRESENT.encode_request({ "SubFunction": { "suppressPosRspMsgIndicationBit": sprmib, "zeroSubFunction": 0} }) tester_present_message = UdsMessage(payload=payload, addressing_type=AddressingType.validate_member(addressing_type)) self.__tester_present_thread = Thread(target=self._send_tester_present_task, args=(tester_present_message, ), daemon=True) self.__tester_present_thread.start()
[docs] def stop_tester_present(self) -> None: """Stop sending Tester Present cyclically.""" if self.is_tester_present_sent: self.__tester_present_stop_event.set() self.__tester_present_thread.join(timeout=self.s3_client / 1000.) # type: ignore self.__tester_present_thread = None else: warn("Cyclical sending of Tester Present is already stopped.", category=UserWarning)
[docs] @decorator_block_receiving def send_request_receive_responses(self, request: UdsMessage) -> Tuple[UdsMessageRecord, Tuple[UdsMessageRecord, ...]]: """ Send diagnostic request and receive all responses (till the final one). :param request: Request message to send. :raise TypeError: Provided value is not an instance of UdsMessage class. :raise TimeoutError: Response was initiated with Response Pending message, but never finalized. :return: Tuple with two elements: - record of diagnostic request message that was sent - tuple with diagnostic response messages that were received in the response """ if not isinstance(request, UdsMessage): raise TypeError("Provided request value is not an instance of UdsMessage class.") request_record = self.transport_interface.send_message(request) time_request_sent = request_record.transmission_end.timestamp() sid = RequestSID(request_record.payload[0]) response_records: List[UdsMessageRecord] = [] time_elapsed_ms = (time() - time_request_sent) * 1000. # get the first response (either final response or negative response with response pending nrc) try: response_record = self._receive_response(sid=sid, start_timeout=self.p2_client_timeout - time_elapsed_ms, end_timeout=self.p6_client_timeout - time_elapsed_ms) except TimeoutError as exception: raise TimeoutError("P6Client timeout reached.") from exception if response_record is None: # timeout achieved - no response return request_record, tuple() response_records.append(response_record) timestamp_p6_ext_timeout = time_request_sent + self.p6_ext_client_timeout / 1000. while self.is_response_pending_message(message=response_records[-1], request_sid=sid): timestamp_now = time() timestamp_p2_ext_timeout = (response_records[-1].transmission_end.timestamp() + self.p2_ext_client_timeout / 1000.) remaining_p2_ext_timeout = (timestamp_p2_ext_timeout - timestamp_now) * 1000. remaining_p6_ext_timeout = (timestamp_p6_ext_timeout - timestamp_now) * 1000. try: response_record = self._receive_response(sid=sid, start_timeout=remaining_p2_ext_timeout, end_timeout=remaining_p6_ext_timeout) except TimeoutError as exception: raise TimeoutError("P6*Client timeout reached.") from exception if response_record is None: # timeout achieved - no following response raise TimeoutError(f"P2*Client timeout ({self.p2_ext_client_timeout} ms) reached after receiving " f"{len(response_records)} response pending messages " f"({bytes_to_hex(response_records[-1].payload)}).") response_records.append(response_record) self._update_measured_client_values(request_record=request_record, response_records=response_records) return request_record, tuple(response_records)