Source code for pykiso.lib.auxiliaries.instrument_control_auxiliary.instrument_control_auxiliary

##########################################################################
# Copyright (c) 2010-2020 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Instrument Control Auxiliary
****************************

:module: instrument_control_auxiliary

:synopsis: Auxiliary used to communicate via a VISA connector using the
    SCPI protocol.

.. currentmodule:: instrument_control_auxiliary

"""
import logging
import queue
import re
import time
from typing import List, Tuple, Union

from pykiso import AuxiliaryInterface, CChannel

from .lib_scpi_commands import LibSCPI

log = logging.getLogger(__name__)


[docs]class InstrumentControlAuxiliary(AuxiliaryInterface): """Auxiliary used to communicate via a VISA connector using the SCPI protocol. """ def __init__( self, com: CChannel, instrument="", write_termination="\n", output_channel: int = None, **kwargs, ): """Constructor. :param com: VISAChannel that supports VISA communication :param instrument: name of the instrument currently in use (will be used to adapt the SCPI commands) :param write_termination: write termination character :param output_channel: output channel to use on the instrument currently in use (if more than one) """ super().__init__(is_pausable=True, **kwargs) self.channel = com self.instrument = instrument self.write_termination = write_termination self.output_channel = output_channel self.helpers = LibSCPI(self, self.instrument)
[docs] def write( self, write_command: str, validation: Tuple[str, Union[str, List[str]]] = None ) -> str: """Send a write request to the instrument. :param write_command: command to send :param validation: contain validation criteria apply on the response """ log.debug(f"Sending a write request in {self} for {write_command}") return self.run_command( cmd_message="write", cmd_data=(write_command, validation), blocking=True, timeout_in_s=5, )
[docs] def handle_write( self, write_command: str, validation: Tuple[str, Union[str, List[str]]] = None ) -> str: """Send a write request to the instrument and then returns if the value was successfully written. A query is sent immediately after the writing and the answer is compared to the expected one. :param write_command: write command to send :param validation: tuple of the form (validation command (str), expected output (str or list of str)) :return: status message depending on the command validation: SUCCESS, FAILURE or NO_VALIDATION """ log.debug(f"Sending a write request in {self} for {write_command}") # Send the message with the termination character self.channel.cc_send(msg=write_command + self.write_termination, raw=False) if validation is not None: # Check that the writing request was successfully performed on the instrument validation_query, validation_expected_output = validation if isinstance(validation_expected_output, str): validation_expected_output = [validation_expected_output] # Send the validation query with a delay between the previous write and this query request # to make sure that the instrument has had sufficient time to complete the operation time.sleep(0.1) validation_query_response = self.handle_query(validation_query) # Check that the responses matches the expected response if ( isinstance(validation_query_response, str) and validation_query_response != "" ): # Check the tag VALUE{} in the expected output and adapt the validation process match = re.compile(r"VALUE{([+-]?\d+[\.\d]*)}").findall( validation_expected_output[0] ) if match: # tag "VALUE{value}" detected. This tag can be used when we want to ensure that # a numerical parameter was correctly set on the instrument if float(match[0]) == float(validation_query_response.split()[0]): write_success = True else: write_success = False else: # compare the expected output with the result of the validation query command if validation_query_response in validation_expected_output: write_success = True else: write_success = False if write_success is True: log.debug( f"Write request {write_command} successful after verification" ) return "SUCCESS" else: log.warning( f"Write request {write_command} failed! Validation query response was different than expected." ) return "FAILURE" else: log.warning( f"Write request {write_command} failed! No response received for the validation query." ) return "FAILURE" else: log.debug(f"Write request {write_command} processed without validation") return "NO_VALIDATION"
[docs] def read(self) -> Union[str, bool]: """Send a read request to the instrument. :return: received response from instrument otherwise empty string """ log.debug(f"Sending a read request in {self}") return self.run_command( cmd_message="read", cmd_data=None, blocking=True, timeout_in_s=5 )
[docs] def handle_read(self) -> str: """Handle read command by calling associated connector cc_receive. :return: received response from instrument otherwise empty string """ return self.channel.cc_receive(raw=False)
[docs] def query(self, query_command: str) -> Union[bytes, str]: """Send a query request to the instrument. Uses the 'query' method of the channel if available, uses 'cc_send' and 'cc_receive' otherwise. :param query_command: query command to send :return: Response message, None if the request expired with a timeout. """ log.debug(f"Sending a query request in {self}) for {query_command}") return self.run_command( cmd_message="query", cmd_data=query_command, blocking=True, timeout_in_s=5 )
[docs] def handle_query(self, query_command: str) -> str: """Send a query request to the instrument. Uses the 'query' method of the channel if available, uses 'cc_send' and 'cc_receive' otherwise. :param query_command: query command to send :return: Response message, None if the request expired with a timeout. """ if hasattr(self.channel, "query"): return self.channel.query(query_command + self.write_termination) else: self.channel.cc_send(msg=query_command + self.write_termination, raw=False) time.sleep(0.05) return self.channel.cc_receive(raw=False)
[docs] def stop(self) -> None: """Stop the auxiliary thread""" self.wait_event.set() self.stop_event.set()
def _create_auxiliary_instance(self) -> bool: """Open the connector. :return: always True """ log.info("Create auxiliary instance") try: # Open instrument log.info("Open instrument") self.channel.open() # Enable remote control log.info("Enable remote control") command, validation = self.helpers.get_command( cmd_tag="REMOTE_CONTROL", cmd_type="write", cmd_validation=["REMOTE", "ON", "1"], ) self.handle_write(f"{command} ON".strip(), validation) # Select channel if needed if self.output_channel is not None: log.info(f"Select channel {self.output_channel}") command, validation = self.helpers.get_command( cmd_tag="OUTPUT_CHANNEL", cmd_type="write", cmd_validation=f"{self.output_channel}", ) self.handle_write( f"{command} {self.output_channel}".strip(), validation ) else: log.info("Using default output channel.") except Exception: log.exception("Unable to safely open the instrument.") return True def _delete_auxiliary_instance(self) -> bool: """Close the connector. :return: always True """ log.info("Delete auxiliary instance") try: self.channel.close() except Exception: log.exception("Unable to close the instrument.") return True
[docs] def run_command( self, cmd_message: str, cmd_data: str = None, blocking: bool = True, timeout_in_s: int = 0, ) -> Union[str, bool]: """Put a command to execute in thread queue in. :param cmd_message: command request to the auxiliary :param cmd_data: payload to send over associated connector. :param blocking: If you want the command request to be blocking or not :param timeout_in_s: Number of time (in s) you want to wait for an answer :return: received response from connected instrument or False if an error occurred. """ response_received = None if self.lock.acquire(): self.wait_event.set() # Trigger the internal requests self.queue_in.put(("command", cmd_message, cmd_data)) # Wait until the test request was received try: log.debug(f"message to encode '{cmd_data}' in {self}") response_received = self.queue_out.get(blocking, timeout_in_s) except queue.Empty: log.error("no reply received or command not executed within time") # Release the above lock self.wait_event.clear() self.lock.release() return response_received
def _run_command( self, cmd_message: str, cmd_data: Union[tuple, str, None] ) -> Union[str, bool]: """Run a command on thread level and return the results using queue out. :param cmd_message: type of command to execute (read, write, or query) :param cmd_data: data to send using the associated connector. :return: received response from connected instrument or False if an error occurred. """ try: if cmd_message == "write": write_req, validation = cmd_data return self.handle_write(write_req, validation) elif cmd_message == "query": return self.handle_query(query_command=cmd_data) else: return self.handle_read() except Exception: log.exception("An error occurred during command run") return False
[docs] def resume(self) -> None: """Not used.""" log.error(f"resume method is not implemented for {self}")
[docs] def suspend(self) -> None: """Not used.""" log.error(f"suspend method is not implemented for {self}")
def _abort_command(self) -> None: """Not used""" pass def _receive_message(self, timeout_in_s: float) -> None: """Not used""" pass