Source code for xrf_explorer.server.spectra.spectra

import logging

from math import ceil, floor, log

import numpy as np
import xraydb

from xrf_explorer.server.file_system import get_config
from xrf_explorer.server.file_system.cubes import get_raw_data

LOG: logging.Logger = logging.getLogger(__name__)


[docs] def get_average_global(data: np.ndarray) -> list[float]: """ Computes the average of the raw data for each bin on the whole painting. :param data: datacube containing the raw data :return: list where the index is the channel number and the value is the average global intensity of that channel """ mean: np.ndarray = np.mean(data, axis=(0, 1)) return mean.tolist()
[docs] def get_average_selection(data_source: str, mask: np.ndarray) -> list[float]: """ Computes the average of the raw data for each bin on the selected pixels. :param data_source: name of the data source to get the selection average from :param mask: The mask describing the selected pixels :return: list where the index is the channel number and the value is the average intensity of that channel within the selection """ config: dict | None = get_config() if config is None: LOG.error("Could not get backend configuration") return [] max_points: int = int(config["max-spectrum-points"]) num_points: int = np.count_nonzero(mask) level: int = 0 if num_points > 0: level = max(0, ceil(log(num_points / max_points, 4))) LOG.info("Getting selection at mip level %i", level) data: np.ndarray = get_raw_data(data_source, level=level) length: int = data.shape[2] total: np.ndarray = np.zeros(length) scaled_mask: np.ndarray = np.empty((ceil(mask.shape[0] / 2 ** level), ceil(mask.shape[1] / 2 ** level))) scaled_mask.fill(False) indices: np.ndarray = np.argwhere(mask) # function to vectorize to set the map def set_mask(index: np.ndarray): nonlocal scaled_mask scaled_mask[floor(index[0] / 2 ** level), floor(index[1] / 2 ** level)] = True if indices.size > 0: np.vectorize(set_mask, signature="(2)->()")(indices) indices: np.ndarray = np.argwhere(scaled_mask) # Function to vectorize to calculate the average data def add_row(index: np.ndarray): nonlocal total total += data[index[0], index[1], :] average: np.ndarray = np.zeros(indices.shape[0]) if indices.size > 0: np.vectorize(add_row, signature="(2)->()")(indices) average: np.ndarray = total / indices.shape[0] LOG.info("Calculated the average spectrum for the selection.") return average.tolist()
[docs] def get_theoretical_data(element: str, excitation_energy_kev: float, low: int, high: int, bin_size: int) -> list: """ Get the theoretical spectrum and peaks of an element. Precondition: 0 <= low < high < 4096, 0 < bin_size <= 4096, 0 <=excitation_energy_kev <= 40 :param element: symbol of the element :param excitation_energy_kev: excitation energy :param low: lower channel boundary :param high: higher channel boundary :param bin_size: size of each bin :return: list with first element being a list of dictionaries representing the spectra points, second being a list of dictionaries representing the peaks """ # remove last character to get periodic table symbol element = element[:len(element) - 1].strip() if element == 'yAl': element = 'Al' try: # get spectrum and peaks data: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray] | np.ndarray = ( get_element_spectrum(element, excitation_energy_kev) ) except: LOG.info(f"Could not get theoretical spectral for excitation energy {excitation_energy_kev}") return [] # get_element_spectrum returns normalized data, rescale to [0, 255] y_spectrum: np.ndarray = data[1] * 255 response: list = [] spectrum: list = [] bin_nr = ceil((high - low) / bin_size) # for each bin for i in range(bin_nr): # compute starting channel start_channel = low + i * bin_size # y_spectra has 10000 points instead of 4096, so scale index and bin size to slice it start_index = floor(start_channel * len(y_spectrum) / 4096) new_bin_size = round(bin_size / (4096 / len(y_spectrum))) mean = np.mean(y_spectrum[start_index:start_index + new_bin_size]) spectrum.append(mean) response.append(spectrum) # get_element_spectrum returns data in domain [0, 40], rescale to [0, 4096] x_peaks = data[2] * 4096 / abs(data[0].max() - data[0].min()) # get_element_spectrum returns normalized data, rescale to [0, 255] _ = data[3] * 255 peaks = [] for i in range(len(x_peaks)): # take only the peaks within the domain [low, high] if low <= x_peaks[i] < high: peaks.append((x_peaks[i] - low) / bin_size) response.append(peaks) return response
# functions to compute theoretical elemental spectrum # From xrf4u: https://github.com/fligt/maxrf4u/blob/main/maxrf4u/xphysics.py # Author: Frank Ligterink
[docs] class ElementLines: """Computes fluorescence emission line energies and intensities for `element`. """ def __init__(self, element, excitation_energy_kev): excitation_energy = 1000 * excitation_energy_kev lines = xraydb.xray_lines(element, excitation_energy=excitation_energy) peak_names = [] peak_labels = [] peak_energies = [] peak_intensities = [] for name, line in lines.items(): peak_names.append(name) # intensities (a.k.a. transition probabilities) sum up to unity within each level energy, intensity, initial_level, final_level = line peak_energies.append(energy) label = f'{element}_{initial_level}{final_level}' peak_labels.append(label) # get corresponding edge properties edge = initial_level # IUPAC notation! e.g. 'L1', not 'La' _, fluo_yield, jump_ratio = xraydb.xray_edge( element, edge) jump_coeff = (jump_ratio - 1) / jump_ratio # see Volker # print(f'{name}: {energy}; jump_coeff: {jump_coeff:.03f}; fluo_yield: {fluo_yield}') # multiplying edge jump coefficient, intensity and fluorescence yield... peak_intensity = jump_coeff * intensity * fluo_yield peak_intensities.append(peak_intensity) # determine sorting according to peak_intensities... self.peak_intensities = np.array(peak_intensities) indices = np.argsort(self.peak_intensities)[::-1] # sort self.peak_intensities = self.peak_intensities[indices] self.peak_energies = np.array(peak_energies)[indices] / 1000 self.peak_names = np.array(peak_names)[indices] self.peak_labels = np.array(peak_labels)[indices]
[docs] def get_element_spectrum( element: str, excitation_energy_kev: float, normalize: bool = True, x_kevs: np.ndarray | None = None, std: float = 0.01 ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray] | np.ndarray: """Compute simple excitation spectrum (no matrix effects) and peaks. :param element: symbol of the element :param excitation_energy_kev: excitation energy :param normalize: boolean representing whether to normalize y values :param x_kevs: pre-determined x values :param std: standard deviation of gaussian filter :return: x values of the spectrum, y values of the spectrum, peak energies, peak intensities """ el = ElementLines(element, excitation_energy_kev) pe = el.peak_energies pi = el.peak_intensities x, y_spectrum = gaussian_convolve(pe, pi, x_kevs=x_kevs, std=std) if normalize: y_spectrum = y_spectrum / y_spectrum.max() if x_kevs is None: return x, y_spectrum, pe, pi else: return y_spectrum
[docs] def gaussian_convolve( peak_energies: np.ndarray, peak_intensities: np.ndarray, x_kevs: np.ndarray | None = None, std: float = 0.01 ) -> tuple[np.ndarray, np.ndarray]: """Convolve line spectrum defined by `peak_energies` and `peak_intensities` with a Gaussian peak shape. :param peak_energies: peak energies of the element :param peak_intensities: peak intensities of the element :param x_kevs: pre-determined x values :param std: standard deviation of gaussian filter """ if x_kevs is None: x_kevs = np.linspace(0, 40, 10000) y_spectrum = np.zeros_like(x_kevs) for peak_energy, peak_intensity in zip(peak_energies, peak_intensities): y_spectrum += peak_intensity * \ np.exp(-(1 / std) * (x_kevs - peak_energy) ** 2) return x_kevs, y_spectrum