Source code for polarimetry.lhcb

# cspell:ignore modelparameters modelstudies
# pyright: reportConstantRedefinition=false
"""Import functions that are specifically for this LHCb analysis.

.. seealso:: :doc:`/cross-check`
"""
from __future__ import annotations

import itertools
import re
import sys
from copy import deepcopy
from math import sqrt
from pathlib import Path
from typing import Generic, Iterable, Pattern, TypeVar

import attrs
import numpy as np
import sympy as sp
import yaml
from attrs import frozen
from sympy.core.symbol import Str

from polarimetry.amplitude import (
    AmplitudeModel,
    DalitzPlotDecompositionBuilder,
    DynamicsBuilder,
    get_indexed_base,
)
from polarimetry.decay import IsobarNode, Particle, ThreeBodyDecay, ThreeBodyDecayChain
from polarimetry.spin import filter_parity_violating_ls, generate_ls_couplings

from .dynamics import (
    formulate_breit_wigner,
    formulate_bugg_breit_wigner,
    formulate_exponential_bugg_breit_wigner,
    formulate_flatte_1405,
)
from .particle import PARTICLE_TO_ID, K, Λc, Σ, p, π

if sys.version_info < (3, 8):
    from typing_extensions import Literal
else:
    from typing import Literal


[docs]def load_model( model_file: Path | str, particle_definitions: dict[str, Particle], model_id: int | str = 0, ) -> AmplitudeModel: builder = load_model_builder(model_file, particle_definitions, model_id) model = builder.formulate() imported_parameter_values = load_model_parameters( model_file, builder.decay, model_id, particle_definitions ) model.parameter_defaults.update(imported_parameter_values) return model
[docs]def load_model_builder( model_file: Path | str, particle_definitions: dict[str, Particle], model_id: int | str = 0, ) -> DalitzPlotDecompositionBuilder: with open(model_file) as f: model_definitions = yaml.load(f, Loader=yaml.SafeLoader) model_title = _find_model_title(model_definitions, model_id) model_def = model_definitions[model_title] lineshapes: dict[str, str] = model_def["lineshapes"] min_ls = "LS couplings" not in model_title decay = load_three_body_decay(lineshapes, particle_definitions, min_ls) amplitude_builder = DalitzPlotDecompositionBuilder(decay, min_ls) for chain in decay.chains: lineshape_choice = lineshapes[chain.resonance.name] dynamics_builder = _get_resonance_builder(lineshape_choice) amplitude_builder.dynamics_choices.register_builder(chain, dynamics_builder) return amplitude_builder
def _find_model_title(model_definitions: dict, model_id: int | str) -> str: if isinstance(model_id, int): if model_id >= len(model_definitions): raise KeyError( f"Model definition file contains {len(model_definitions)} models, but" f" trying to get number {model_id}." ) for i, title in enumerate(model_definitions): if i == model_id: return title if model_id not in model_definitions: raise KeyError(f'Could not find model with title "{model_id}"') return model_id def _get_resonance_builder(lineshape: str) -> DynamicsBuilder: if lineshape in {"BreitWignerMinL", "BreitWignerMinL_LS"}: return formulate_breit_wigner if lineshape == "BuggBreitWignerExpFF": return formulate_exponential_bugg_breit_wigner if lineshape in {"BuggBreitWignerMinL", "BuggBreitWignerMinL_LS"}: return formulate_bugg_breit_wigner if lineshape in {"Flatte1405", "Flatte1405_LS"}: return formulate_flatte_1405 raise NotImplementedError(f'No dynamics implemented for lineshape "{lineshape}"')
[docs]def load_three_body_decay( resonance_names: Iterable[str], particle_definitions: dict[str, Particle], min_ls: bool = True, ) -> ThreeBodyDecay: def create_isobar(resonance: Particle) -> list[ThreeBodyDecayChain]: if resonance.name.startswith("K"): child1, child2, spectator = π, K, p elif resonance.name.startswith("L"): child1, child2, spectator = K, p, π elif resonance.name.startswith("D"): child1, child2, spectator = p, π, K else: raise NotImplementedError prod_ls_couplings = generate_ls(Λc, resonance, spectator, conserve_parity=False) dec_ls_couplings = generate_ls(resonance, child1, child2, conserve_parity=True) if min_ls: decay = IsobarNode( parent=Λc, child1=IsobarNode( parent=resonance, child1=child1, child2=child2, interaction=min(dec_ls_couplings), ), child2=spectator, interaction=min(prod_ls_couplings), ) return [ThreeBodyDecayChain(decay)] chains = [] for dec_ls, prod_ls in itertools.product(dec_ls_couplings, prod_ls_couplings): decay = IsobarNode( parent=Λc, child1=IsobarNode( parent=resonance, child1=child1, child2=child2, interaction=dec_ls, ), child2=spectator, interaction=prod_ls, ) chains.append(ThreeBodyDecayChain(decay)) return chains def generate_ls( parent: Particle, child1: Particle, child2: Particle, conserve_parity: bool ) -> list[tuple[int, sp.Rational]]: ls = generate_ls_couplings(parent.spin, child1.spin, child2.spin) if conserve_parity: return filter_parity_violating_ls( ls, parent.parity, child1.parity, child2.parity ) return ls resonances = [particle_definitions[name] for name in resonance_names] chains: list[ThreeBodyDecayChain] = [] for res in resonances: chains.extend(create_isobar(res)) return ThreeBodyDecay( states={state_id: particle for particle, state_id in PARTICLE_TO_ID.items()}, chains=tuple(chains), )
[docs]class ParameterBootstrap: """A wrapper for loading parameters from :download:`model-definitions.yaml </../data/model-definitions.yaml>`. """ def __init__( self, filename: Path | str, decay: ThreeBodyDecay, model_id: int | str = 0, ) -> None: particle_definitions = extract_particle_definitions(decay) symbolic_parameters = load_model_parameters_with_uncertainties( filename, decay, model_id, particle_definitions ) self._parameters = {str(k): v for k, v in symbolic_parameters.items()} @property def values(self) -> dict[str, complex | float | int]: return {k: v.value for k, v in self._parameters.items()} @property def uncertainties(self) -> dict[str, complex | float | int]: return {k: v.uncertainty for k, v in self._parameters.items()}
[docs] def create_distribution( self, sample_size: int, seed: int | None = None ) -> dict[str, complex | float | int]: return _smear_gaussian( parameter_values=self.values, parameter_uncertainties=self.uncertainties, size=sample_size, seed=seed, )
[docs]def load_model_parameters( filename: Path | str, decay: ThreeBodyDecay, model_id: int | str = 0, particle_definitions: dict[str, Particle] | None = None, ) -> dict[sp.Indexed | sp.Symbol, complex | float]: parameters = load_model_parameters_with_uncertainties( filename, decay, model_id, particle_definitions ) return {k: v.value for k, v in parameters.items()}
[docs]def load_model_parameters_with_uncertainties( filename: Path | str, decay: ThreeBodyDecay, model_id: int | str = 0, particle_definitions: dict[str, Particle] | None = None, ) -> dict[sp.Indexed | sp.Symbol, MeasuredParameter]: with open(filename) as f: model_definitions = yaml.load(f, Loader=yaml.SafeLoader) model_title = _find_model_title(model_definitions, model_id) min_ls = "LS couplings" not in model_title parameter_definitions = model_definitions[model_title]["parameters"] parameters = _to_symbol_value_mapping( parameter_definitions, min_ls, particle_definitions ) decay_couplings = compute_decay_couplings(decay) parameters.update(decay_couplings) return parameters
def _smear_gaussian( parameter_values: dict[str, complex | float], parameter_uncertainties: dict[str, complex | float], size: int, seed: int | None = None, ) -> dict[str, np.ndarray]: value_distributions = {} for k, mean in parameter_values.items(): std = parameter_uncertainties[k] distribution = _create_gaussian_distribution(mean, std, size, seed) value_distributions[k] = distribution return value_distributions def _create_gaussian_distribution( mean: complex | float, std: complex | float, size: int, seed: int | None = None, ): rng = np.random.default_rng(seed) if isinstance(mean, complex) and isinstance(std, complex): return ( rng.normal(mean.real, std.real, size) + rng.normal(mean.imag, std.imag, size) * 1j ) if isinstance(mean, (float, int)) and isinstance(std, (float, int)): return rng.normal(mean, std, size) raise NotImplementedError _T = TypeVar("_T") _K = TypeVar("_K") _V = TypeVar("_V")
[docs]def flip_production_coupling_signs(obj: _T, subsystem_names: Iterable[Pattern]) -> _T: if isinstance(obj, AmplitudeModel): return attrs.evolve( obj, parameter_defaults=_flip_signs(obj.parameter_defaults, subsystem_names), ) if isinstance(obj, ParameterBootstrap): bootstrap = deepcopy(obj) bootstrap._parameters = _flip_signs(bootstrap._parameters, subsystem_names) # type: ignore[reportPrivateUsage] return bootstrap if isinstance(obj, dict): return _flip_signs(obj) raise NotImplementedError
def _flip_signs( parameters: dict[_K, _V], subsystem_names: Iterable[Pattern] ) -> dict[_K, _V]: pattern = rf".*\\mathrm{{production}}\[[{''.join(subsystem_names)}].*" return { key: _flip(value) if re.match(pattern, str(key)) else value for key, value in parameters.items() } def _flip(obj: _V) -> _V: if isinstance(obj, MeasuredParameter): return attrs.evolve(obj, value=_flip(obj.value)) return -obj
[docs]def compute_decay_couplings( decay: ThreeBodyDecay, ) -> dict[sp.Indexed, MeasuredParameter[int]]: H_dec = get_indexed_base("decay") half = sp.Rational(1, 2) decay_couplings = {} for chain in decay.chains: R = Str(chain.resonance.name) if chain.resonance.name.startswith("K"): decay_couplings[H_dec[R, 0, 0]] = 1 if chain.resonance.name[0] in {"D", "L"}: child1, child2 = chain.decay_products if chain.resonance.name.startswith("D"): coupling_pos = H_dec[R, +half, 0] coupling_neg = H_dec[R, -half, 0] else: coupling_pos = H_dec[R, 0, +half] coupling_neg = H_dec[R, 0, -half] decay_couplings[coupling_pos] = 1 decay_couplings[coupling_neg] = int( chain.resonance.parity * child1.parity * child2.parity * (-1) ** (chain.resonance.spin - child1.spin - child2.spin) ) return { symbol: MeasuredParameter(value, hesse=0) for symbol, value in decay_couplings.items() }
def _to_symbol_value_mapping( parameter_dict: dict[str, str], min_ls: bool, particle_definitions: dict[str, Particle], ) -> dict[sp.Basic, complex | float]: key_to_value: dict[str, MeasuredParameter] = {} for key, str_value in parameter_dict.items(): if key.startswith("Ar"): identifier = key[2:] str_imag = parameter_dict[f"Ai{identifier}"] key = f"A{identifier}" indexed_symbol: sp.Indexed = parameter_key_to_symbol( key, min_ls, particle_definitions ) resonance_name = str(indexed_symbol.indices[0]) resonance = particle_definitions[resonance_name] if min_ls: conversion_factor = get_conversion_factor(resonance) else: _, L, S = indexed_symbol.indices conversion_factor = get_conversion_factor_ls(resonance, L, S) real = _to_value_with_uncertainty(str_value) imag = _to_value_with_uncertainty(str_imag) parameter = _form_complex_parameter(real, imag) key_to_value[key] = attrs.evolve( parameter, value=conversion_factor * parameter.value, ) elif key.startswith("Ai"): continue else: key_to_value[key] = _to_value_with_uncertainty(str_value) return { parameter_key_to_symbol(key, min_ls, particle_definitions): value for key, value in key_to_value.items() } def _to_value_with_uncertainty(str_value: str) -> MeasuredParameter[float]: """ >>> _to_value_with_uncertainty('1.5 ± 0.2') MeasuredParameter(value=1.5, hesse=0.2, model=None, systematic=None) >>> par = _to_value_with_uncertainty('0.94 ± 0.042 ± 0.35 ± 0.04') >>> par MeasuredParameter(value=0.94, hesse=0.042, model=0.35, systematic=0.04) >>> par.uncertainty 0.058 """ float_values = tuple(float(s) for s in str_value.split(" ± ")) if len(float_values) == 2: return MeasuredParameter( value=float_values[0], hesse=float_values[1], ) if len(float_values) == 4: return MeasuredParameter( value=float_values[0], hesse=float_values[1], model=float_values[2], systematic=float_values[3], ) raise ValueError(f"Cannot convert '{str_value}' to {MeasuredParameter.__name__}") def _form_complex_parameter( real: MeasuredParameter[float], imag: MeasuredParameter[float], ) -> MeasuredParameter[complex]: def convert_optional(real: float | None, imag: float | None) -> complex | None: if real is None or imag is None: return None return complex(real, imag) return MeasuredParameter( value=complex(real.value, imag.value), hesse=complex(real.hesse, imag.hesse), model=convert_optional(real.model, imag.model), systematic=convert_optional(real.systematic, imag.systematic), ) ParameterType = TypeVar("ParameterType", complex, float) """Template for the parameter type of a for `MeasuredParameter`."""
[docs]@frozen class MeasuredParameter(Generic[ParameterType]): """Data structure for imported parameter values. `MeasuredParameter.value` and `~.MeasuredParameter.hesse` are taken from the `supplemental material <https://cds.cern.ch/record/2824328/files>`_, whereas `~.MeasuredParameter.model` and `~.MeasuredParameter.systematic` are taken from `Tables 8 and 9 <https://arxiv.org/pdf/2208.03262.pdf#page=21>`_ from the original LHCb paper :cite:`LHCb-PAPER-2022-002`. """ value: ParameterType """Central value of the parameter as determined by a fit with Minuit.""" hesse: ParameterType """Parameter uncertainty as determined by a fit with Minuit.""" model: ParameterType | None = None """Systematic uncertainties from fit bootstrapping.""" systematic: ParameterType | None = None """Systematic uncertainties from detector effects etc..""" @property def uncertainty(self) -> ParameterType: if self.systematic is None: return self.hesse if isinstance(self.value, float): return sqrt(self.hesse**2 + self.systematic**2) return complex( sqrt(self.hesse.real**2 + self.systematic.real**2), sqrt(self.hesse.imag**2 + self.systematic.imag**2), )
[docs]def get_conversion_factor(resonance: Particle) -> Literal[-1, 1]: # https://github.com/ComPWA/polarimetry/issues/5#issue-1220525993 half = sp.Rational(1, 2) if resonance.name.startswith("D"): return int(-resonance.parity * (-1) ** (resonance.spin - half)) if resonance.name.startswith("K"): return 1 if resonance.name.startswith("L"): return int(-resonance.parity) raise NotImplementedError(f"No conversion factor implemented for {resonance.name}")
[docs]def get_conversion_factor_ls( resonance: Particle, L: sp.Rational, S: sp.Rational ) -> Literal[-1, 1]: # https://github.com/ComPWA/polarimetry/issues/192#issuecomment-1321892494 if resonance.name.startswith("K") and resonance.spin == 0: return 1 half = sp.Rational(1, 2) CG_flip_factor = int((-1) ** (L + S - half)) return get_conversion_factor(resonance) * CG_flip_factor
[docs]def parameter_key_to_symbol( key: str, min_ls: bool = True, particle_definitions: dict[str, Particle] | None = None, ) -> sp.Indexed | sp.Symbol: H_prod = get_indexed_base("production", min_ls) half = sp.Rational(1, 2) if key.startswith("A"): # https://github.com/ComPWA/polarimetry/issues/5#issue-1220525993 R = _stringify(key[1:-1]) subsystem_identifier = str(R)[0] coupling_number = int(key[-1]) if min_ls: # Helicity couplings if subsystem_identifier in {"D", "L"}: if coupling_number == 1: return H_prod[R, -half, 0] if coupling_number == 2: return H_prod[R, +half, 0] if subsystem_identifier == "K": if str(R) in {"K(700)", "K(1430)"}: if coupling_number == 1: return H_prod[R, 0, +half] if coupling_number == 2: return H_prod[R, 0, -half] else: if coupling_number == 1: return H_prod[R, 0, -half] if coupling_number == 2: return H_prod[R, -1, -half] if coupling_number == 3: return H_prod[R, +1, +half] if coupling_number == 4: return H_prod[R, 0, +half] else: # LS-couplings: supplemental material p.1 (https://cds.cern.ch/record/2824328/files) if particle_definitions is None: raise ValueError( "You need to provide particle definitions in order to map the" " coupling IDs to coupling symbols" ) resonance = particle_definitions[str(R)] if subsystem_identifier in {"D", "L"}: if coupling_number == 1: return H_prod[R, resonance.spin - half, resonance.spin] if coupling_number == 2: return H_prod[R, resonance.spin + half, resonance.spin] if subsystem_identifier == "K": if resonance.spin == 0: # "K(700)", "K(1430)" if coupling_number == 1: return H_prod[R, 0, half] if coupling_number == 2: return H_prod[R, 1, half] else: if coupling_number == 1: return H_prod[R, 0, half] if coupling_number == 2: return H_prod[R, 1, half] if coupling_number == 3: return H_prod[R, 1, 3 * half] if coupling_number == 4: return H_prod[R, 2, 3 * half] if key.startswith("alpha"): R = _stringify(key[5:]) return sp.Symbol(Rf"\alpha_{{{R}}}") if key.startswith("gamma"): R = _stringify(key[5:]) return sp.Symbol(Rf"\gamma_{{{R}}}") if key.startswith("M"): R = _stringify(key[1:]) return sp.Symbol(Rf"m_{{{R}}}") if key.startswith("G1"): R = _stringify(key[2:]) return sp.Symbol(Rf"\Gamma_{{{R} \to {p.latex} {K.latex}}}") if key.startswith("G2"): R = _stringify(key[2:]) return sp.Symbol(Rf"\Gamma_{{{R} \to {Σ.latex} {π.latex}}}") if key.startswith("G"): R = _stringify(key[1:]) return sp.Symbol(Rf"\Gamma_{{{R}}}") if key == "dLc": return sp.Symbol(R"R_{\Lambda_c}") raise NotImplementedError( f'Cannot convert key "{key}" in model parameter JSON file to SymPy symbol' )
def _stringify(obj) -> Str: if isinstance(obj, Particle): return Str(obj.name) return Str(f"{obj}")
[docs]def extract_particle_definitions(decay: ThreeBodyDecay) -> dict[str, Particle]: particles = {} def update_definitions(particle: Particle) -> None: particles[particle.name] = particle for chain in decay.chains: update_definitions(chain.parent) update_definitions(chain.resonance) update_definitions(chain.spectator) update_definitions(chain.decay_products[0]) update_definitions(chain.decay_products[1]) return particles