Source code for pykiso.auxiliary

##########################################################################
# Copyright (c) 2010-2020 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Auxiliary Interface Definition
******************************

:module: auxiliary

:synopsis: Implementation of basic auxiliary functionality and
    definition of abstract methods for override

.. currentmodule:: pykiso


"""

import abc
import logging
import queue
import threading
import time
from typing import Any, List, Optional

from pykiso.test_setup.dynamic_loader import PACKAGE

from .types import MsgType

log = logging.getLogger(__name__)


# Ensure lock and queue unique reference: needed because python will do
# a copy of the created object when the unittests are called
[docs]class AuxiliaryInterface(threading.Thread, metaclass=abc.ABCMeta): """Defines the Interface of Auxiliaries. Auxiliaries get configured by the Test Coordinator, get instantiated by the TestCases and in turn use Connectors. """ def __init__( self, name: str = None, is_proxy_capable: bool = False, is_pausable: bool = False, activate_log: List[str] = None, ): """Auxiliary thread initialization. :param name: alias of the auxiliary instance :param is_proxy_capable: notify if the current auxiliary could be (or not) associated to a proxy-auxiliary. :param is_pausable: notify if the current auxiliary could be (or not) paused :param activate_log: loggers to deactivate """ # Initialize thread class super().__init__() self.initialize_loggers(activate_log) # Save the name self.name = name # Define thread control attributes & methods self.lock = threading.RLock() self.queue_in = queue.Queue() self.queue_out = queue.Queue() self.stop_event = threading.Event() self.wait_event = threading.Event() self.is_proxy_capable = is_proxy_capable self.is_pausable = is_pausable # Create state self.is_instance = False # Start thread self.start()
[docs] @staticmethod def initialize_loggers(loggers: Optional[List[str]]): """Deactivate all external loggers except the specified ones. :param loggers: list of logger names to keep activated """ if loggers is None: loggers = list() # keyword 'all' should keep all loggers to the configured level if "all" in loggers: log.warning( "All loggers are activated, this could lead to performance issues." ) return # keep package and auxiliary loggers relevant_loggers = { name: logger for name, logger in logging.root.manager.loggerDict.items() if not (name.startswith(PACKAGE) or name.endswith("auxiliary")) and not isinstance(logger, logging.PlaceHolder) } # keep child loggers childs = [ logger for logger in relevant_loggers.keys() for parent in loggers if (logger.startswith(parent) or parent.startswith(logger)) ] loggers += childs # keep original level for specified loggers loggers_to_deactivate = set(relevant_loggers) - set(loggers) for logger_name in loggers_to_deactivate: logging.getLogger(logger_name).setLevel(logging.WARNING)
def __repr__(self): name = self.name repr_ = super().__repr__() if name: repr_ = repr_[:1] + f"{name} is " + repr_[1:] return repr_
[docs] def lock_it(self, timeout_in_s: float) -> bool: """Lock to ensure exclusivity. :param timeout_in_s: How many second you want to wait for the lock :type timeout_in_s: integer :return: True - Lock done / False - Lock failed """ return self.lock.acquire(timeout=timeout_in_s)
[docs] def unlock_it(self) -> None: """Unlock exclusivity""" self.lock.release()
[docs] def resume(self) -> None: """Resume current auxiliary's run.""" if not self.stop_event.is_set() and not self.is_instance: self.create_instance() else: log.error("Cannot resume auxiliary, error occurred during creation")
[docs] def suspend(self) -> None: """Supend current auxiliary's run.""" if self.is_instance: self.delete_instance() else: log.error("Cannot suspend auxiliary, error occurred during creation")
[docs] def create_instance(self) -> bool: """Create an auxiliary instance and ensure the communication to it. :return: message.Message() - Contain received message """ if self.lock.acquire(): # Trigger the internal requests self.queue_in.put("create_auxiliary_instance") # Wait until the request was processed report = self.queue_out.get() # Release the above lock self.lock.release() # Return the report return report
[docs] def delete_instance(self) -> bool: """Delete an auxiliary instance and its communication to it. :return: message.Message() - Contain received message """ if self.lock.acquire(): # Trigger the internal requests self.queue_in.put("delete_auxiliary_instance") # Wait until the request was processed report = self.queue_out.get() # Release the above lock self.lock.release() # Return the report return report
[docs] def run_command( self, cmd_message: MsgType, cmd_data: Any = None, blocking: bool = True, timeout_in_s: int = 0, ) -> bool: """Send a test request. :param cmd_message: command request to the auxiliary :param cmd_data: data you would like to populate the command with :param blocking: If you want the command request to be blocking or not :param timeout_in_s: Number of time (in s) you want to wait for an answer :return: True - Successfully sent / False - Failed by sending / None """ return_code = False log.debug(f"sending command '{cmd_message}' in {self}") if cmd_data: log.debug(f"command payload data: {repr(cmd_data)}") if self.lock.acquire(): # Trigger the internal requests self.queue_in.put(("command", cmd_message, cmd_data)) log.debug(f"sent command '{cmd_message}' in {self}") # Wait until the test request was received try: log.debug(f"waiting for reply to command '{cmd_message}' in {self}") return_code = self.queue_out.get(blocking, timeout_in_s) log.debug( f"reply to command '{cmd_message}' received: '{return_code}' in {self}" ) except queue.Empty: log.debug("no reply received within time") # Release the above lock self.lock.release() # Return the ack_report if exists return return_code
[docs] def wait_and_get_report( self, blocking: bool = False, timeout_in_s: int = 0 ) -> MsgType: """Wait for the report of the previous sent test request. :param blocking: True: wait for timeout to expire, False: return immediately :param timeout_in_s: if blocking, wait the defined time in seconds :return: a message.Message() - Message received / None - nothing received """ try: return self.queue_out.get(blocking, timeout_in_s) except queue.Empty: return None
[docs] def abort_command(self, blocking: bool = True, timeout_in_s: float = 25) -> bool: """Force test to abort. :param blocking: If you want the command request to be blocking or not :param timeout_in_s: Number of time (in s) you want to wait for an answer :return: True - Abort was a success / False - if not """ return_code = False if self.lock.acquire(): # Trigger the internal requests self.queue_in.put("abort") # Wait until the test request was received try: return_code = self.queue_out.get(blocking, timeout_in_s) except queue.Empty: log.info("no reply received within time") # Release the above lock self.lock.release() # Return the ack_report if exists return return_code
[docs] def stop(self) -> None: """Force the thread to stop itself.""" self.stop_event.set()
[docs] def run(self) -> None: """ Run function of the auxiliary thread.""" while not self.stop_event.is_set(): # Step 1: Check if a request is available & process it request = "" # Check if a request was received if not self.queue_in.empty(): request = self.queue_in.get_nowait() # Process the request if request == "create_auxiliary_instance" and not self.is_instance: # Call the internal instance creation method return_code = self._create_auxiliary_instance() # Based on the result set status: self.is_instance = return_code # Enqueue the result for the request caller self.queue_out.put(return_code) elif request == "delete_auxiliary_instance" and self.is_instance: # Call the internal instance delete method return_code = self._delete_auxiliary_instance() # Based on the result set status: self.is_instance = not return_code # Enqueue the result for the request caller self.queue_out.put(return_code) elif ( isinstance(request, tuple) and self.is_instance and request[0] == "command" ): # If the instance is created, send the requested command # to the instance via the internal method _, cmd, data = request cmd_response = self._run_command(cmd, data) if cmd_response is not None: self.queue_out.put(cmd_response) elif request == "abort" and self.is_instance: self.queue_out.put(self._abort_command()) elif request != "": # A request was received but could not be processed log.warning(f"Unknown request '{request}', will not be processed!") log.warning(f"Aux status: {self.__dict__}") # Step 2: Check if something was received from the aux instance if instance was created if self.is_instance and not self.is_pausable: received_message = self._receive_message(timeout_in_s=0) # If yes, send it via the out queue if received_message is not None: self.queue_out.put(received_message) # Free up cpu usage when auxiliary is suspended if not self.is_instance: time.sleep(0.050) # If auxiliary instance is created and is pausable if self.is_instance and self.is_pausable: self.wait_event.wait() # Thread stop command was set log.info("{} was stopped".format(self)) # Delete auxiliary external instance if not done if self.is_instance: self._delete_auxiliary_instance()
@abc.abstractmethod def _create_auxiliary_instance(self) -> bool: """Create the auxiliary instance with witch we will communicate. :return: True - Successfully created / False - Failed by creation .. note: Errors should be logged via the logging with the right level """ pass @abc.abstractmethod def _delete_auxiliary_instance(self) -> bool: """Delete the auxiliary instance with witch we will communicate. :return: True - Successfully deleted / False - Failed deleting .. note: Errors should be logged via the logging with the right level """ pass @abc.abstractmethod def _run_command(self, cmd_message: MsgType, cmd_data: bytes = None) -> MsgType: """Run a command for the auxiliary. :param cmd_message: command in form of a message to run :param cmd_data: payload data for the command :return: True - Successfully received by the instance / False - Failed sending .. note: Errors should be logged via the logging with the right level """ pass @abc.abstractmethod def _abort_command(self) -> bool: """Abort the sent command.""" pass @abc.abstractmethod def _receive_message(self, timeout_in_s: float) -> MsgType: """Defines what needs to be done as a receive message. Such as, what do I need to do to receive a message. :param timeout_in_s: How much time to block on the receive :return: message.Message - If one received / None - Else """ pass