Source code for pykiso.lib.connectors.cc_rtt_segger

##########################################################################
# Copyright (c) 2010-2020 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Communication Channel Via segger j-link
***************************************

:module: cc_rtt_segger

:synopsis: channel used to enable RTT communication using Segger J-Link debugger.
    Additionally, RTT logs can be captured by setting the rtt_log_path parameter
    on the specified channel.

.. currentmodule:: cc_rtt_segger

"""

import logging
import threading
import time
from pathlib import Path
from typing import Optional, Union

import pylink

from pykiso import connector
from pykiso.message import Message

log = logging.getLogger(__name__)


[docs]class CCRttSegger(connector.CChannel): """Channel using RTT to communicate through Segger J-Link debugger.""" def __init__( self, serial_number: int = None, chip_name: str = "STM32L562QE", speed: int = 4000, block_address: int = 0x2003F800, verbose: bool = False, tx_buffer_idx: int = 3, rx_buffer_idx: int = 0, rtt_log_path: Optional[str] = None, rtt_log_buffer_idx: int = 0, connection_timeout: int = 5, **kwargs, ): """Initialize attributes. :param serial_number: optional segger debugger serial number (required if many connected) :param chip_name: microcontoller name (STM....) :param speed: communication speed in Hz :param block_address: start address to start RTT communication :param tx_buffer_idx: buffer index used for transmission :param rx_buffer_idx: buffer index used for reception :param verbose: boolean indicating if J-Link connection should be verbose in logging :param rtt_log_path: path to the folder where the RTT log file should be stored :param rtt_log_buffer_idx: buffer index used for RTT logging :param connection_timeout: available time (in seconds) to open the connection """ super().__init__(**kwargs) self.serial_number = serial_number self.chip_name = chip_name self.speed = speed self.block_address = block_address self.tx_buffer_idx = tx_buffer_idx self.rx_buffer_idx = rx_buffer_idx self.verbose = verbose self.jlink = None self.connection_timeout = connection_timeout self.rtt_log_buffer_idx = rtt_log_buffer_idx # initialize rtt logging specific parameters self._is_running = False self.rtt_log_thread = threading.Thread(target=self.receive_log) self.rtt_log_path = rtt_log_path if self.rtt_log_path is not None: self.rtt_log_buffer_size = 0 self.rtt_log_path = Path(rtt_log_path) rtt_fh = logging.FileHandler(self.rtt_log_path / "rtt.log") rtt_fh.setLevel(logging.DEBUG) self.rtt_log = logging.getLogger(f"{__name__}.RTT") self.rtt_log.setLevel(logging.DEBUG) self.rtt_log.addHandler(rtt_fh) def _cc_open(self) -> None: """Connect debugger/microcontroller. This method proceed to the following actions : - create a JLink class instance - connect to the debugger(using open method) - set debugger interface to SWD - connect debugger to the specified chip - start RTT communication - start RTT Logging the specified channel if activated :raise JLinkRTTException: if connection timeout occurred. """ self.jlink = pylink.JLink() # connect to J-Link debugger if not self.jlink.opened(): self.jlink.open(self.serial_number) log.info(f"connection made with J-Link debugger {self.serial_number}") else: log.debug("connection to J-Link already started") # set target interface to SWD self.jlink.set_tif(pylink.enums.JLinkInterfaces.SWD) # connect debugger to the specified target self.jlink.connect( chip_name=self.chip_name, speed=self.speed, verbose=self.verbose ) log.debug( f"connection to chip {self.chip_name} performed at speed {self.speed} Hz" ) # start rtt at the specified address self.jlink.rtt_start(self.block_address) log.info(f"RTT communication started at address {self.block_address}") t_start = time.perf_counter() while True: try: num_up = self.jlink.rtt_get_num_up_buffers() num_down = self.jlink.rtt_get_num_down_buffers() log.debug( f"RTT started. Found {num_up} up and {num_down} down channels." ) break except pylink.errors.JLinkRTTException: time.sleep(0.1) # Exit the while loop once timeout is reached if time.perf_counter() > (t_start + self.connection_timeout): raise # start rtt logging thread on buffer index rtt_log_buffer_idx if self.rtt_log_path is not None: try: rtt_log_buffer = self.jlink.rtt_get_buf_descriptor( self.rtt_log_buffer_idx, True ) self.rtt_log_buffer_size = rtt_log_buffer.SizeOfBuffer if self.rtt_log_buffer_size == 0: raise ValueError log.debug(f"RTT log buffer size is {self.rtt_log_buffer_size} bytes") except ValueError: log.debug("Read RTT log buffer size is 0, defaulting to 1kB") self.rtt_log_buffer_size = 1024 except pylink.errors.JLinkRTTException as e: log.error(f"Could not get RTT log buffer size: {e}") if self.rtt_log_buffer_idx not in range(num_up + 1): log.error(f"Buffer index must be at most {num_up}") self.rtt_log_buffer_idx = 0 self.rtt_log_buffer_size = 1024 finally: self._is_running = True self.rtt_log_thread.start() log.info("RTT logging started") def _cc_close(self) -> None: """Close current RTT communication in use.""" if self.jlink is not None: self._is_running = False self.jlink.rtt_stop() self.jlink.close() log.info("RTT communication closed") def _cc_send(self, msg: Message or bytes, raw: bool = False) -> None: """Send message using the corresponding RTT buffer. :param msg: message to send, should be Message type or bytes. :param raw: if raw is True simply send it as it is, otherwise apply serialization """ try: if not raw: msg = msg.serialize() msg = list(msg) bytes_written = self.jlink.rtt_write(self.tx_buffer_idx, msg) log.debug( f"===> message sent (RTT): on buffer {self.tx_buffer_idx}:{msg}, number of bytes written : {bytes_written}" ) except Exception: log.exception( f"ERROR occurred while sending {len(msg)} bytes on buffer number {self.tx_buffer_idx}" ) def _cc_receive( self, timeout: float = 0.1, raw: bool = False ) -> Union[Message, bytes, None]: """Read message from the corresponding RTT buffer. :param timeout: timeout applied on receive event :param raw: if raw is True return raw bytes, otherwise Message type like :return: Message or raw bytes if successful, otherwise None """ is_timeout = False t_start = time.perf_counter() # rtt_read is not a blocking method due to this fact a while loop is used # to act like a blocking ones. while not is_timeout: try: # Read the message header msg_received = self.jlink.rtt_read( self.rx_buffer_idx, Message().header_size ) # if a message is received if msg_received: # Read the payload and CRC msg_received += self.jlink.rtt_read( self.rx_buffer_idx, msg_received[-1] + Message().crc_byte_size, ) # Parse the bytes list into bytes string msg_received = bytes(msg_received) log.debug( f"<=== message received (RTT) on buffer {self.rx_buffer_idx}:{msg_received}, number of bytes read : {len(msg_received)}" ) if not raw: msg_received = Message.parse_packet(msg_received) break except Exception: log.exception( f"encountered error while receiving message via {self} on buffer {self.rx_buffer_idx}" ) return None # Exit the while loop once timeout is reached if time.perf_counter() > (t_start + timeout): is_timeout = True msg_received = None return msg_received
[docs] def receive_log(self) -> None: """Receive RTT log messages from the corresponding RTT buffer.""" while self._is_running: # receive at most rtt_log_buffer_size of RTT logs log_msg = self.jlink.rtt_read( self.rtt_log_buffer_idx, self.rtt_log_buffer_size ) if log_msg: self.rtt_log.debug(bytes(log_msg).decode())