Source code for jinwu.lf.specfake

"""
XSPEC 封装:计算带内 K = rate/flux(依据 fakeit),带缓存。
XSPEC wrapper: compute in-band K = rate/flux (via fakeit) with caching.

设计目标 | Goals
- 将 pyxspec 的一次性计算打包,避免在 1e4 次模拟中重复开销;
	Bundle one-shot pyxspec computations to avoid repeated overhead in 10k sims.
- 接口简洁:get_K(arf, rmf, background, model, params, band, ...)->float。
	Simple interface: get_K(arf, rmf, background, model, params, band, ...)->float.
- 缓存键严格基于配置唯一性(路径字符串、模型与参数、能段、环境变量)。
	Cache key strictly reflects uniqueness of configuration (paths/model/params/band/env).
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Optional, Tuple, Dict, Union

import hashlib
import numpy as np

from astropy.io import fits
from jinwu.core.config import XSPEC_COSMO_PLANCK18
from pathlib import Path

# 统一的数值类型别名(兼容 numpy 数值类型)
# Unified numeric alias (compatible with numpy scalar types)
Numeric = Union[float, int, np.floating, np.integer]

# 尝试导入 pyxspec;若失败,延迟到会话初始化时报错
# Try importing pyxspec; if unavailable, raise when session is created
_HAVE_XSPEC = False
try:  # pragma: no cover
	import xspec  # type: ignore
	from xspec import AllData, AllModels, FakeitSettings  # type: ignore
	_HAVE_XSPEC = True
except Exception:  # pragma: no cover
	_HAVE_XSPEC = False


[docs] @dataclass(frozen=True) class KConfig: """K 计算所需的不可变配置对象。 Immutable config for K computation. 字段 | Fields - arf/rmf/background: 路径字符串 | file paths - model/params: XSPEC 模型与参数 | XSPEC model and parameter values - band: (emin, emax) [keV] 能段 | energy band in keV - exposure/back_exposure: 源/背景曝光秒数 | source/background exposure (s) - xspec_abund/xspec_xsect/xspec_cosmo: XSPEC 环境常量 | XSPEC globals - allow_prompting: 是否允许 XSPEC 交互提示 | whether to allow XSPEC prompting """ arf: str rmf: str background: Optional[str] model: str params: Tuple[float, ...] band: Tuple[float, float] exposure: Numeric back_exposure: Optional[Numeric] xspec_abund: str xspec_xsect: str xspec_cosmo: str = XSPEC_COSMO_PLANCK18 allow_prompting: bool = False
[docs] def key(self) -> str: """将配置编码为缓存键(SHA256)。 Encode configuration into a cache key (SHA256). 说明 | Notes - 数值保留有限精度,避免浮点噪声导致缓存抖动; Use limited precision for floats to avoid cache churn from tiny diffs. """ h = hashlib.sha256() def upd(s: str) -> None: h.update(s.encode('utf-8')) upd(self.arf) upd(self.rmf) upd(self.background or '') upd(self.model) upd(','.join(f"{p:.12g}" for p in self.params)) upd(f"{self.band[0]:.6f}-{self.band[1]:.6f}") upd(f"{float(self.exposure):.6f}") be = None if self.back_exposure is None else float(self.back_exposure) upd("BE:" + ("None" if be is None else f"{be:.6f}")) upd(self.xspec_abund) upd(self.xspec_xsect) upd(self.xspec_cosmo) upd(str(self.allow_prompting)) return h.hexdigest()
[docs] class XspecSession: """一个轻量的 XSPEC 会话,提供 K 计算能力。 A lightweight XSPEC session that can compute K. 使用原则 | Usage - 每次计算前清空 AllData/AllModels,避免状态污染; Clear AllData/AllModels to avoid state leakage between runs. - 关闭 XSPEC 交互提示,确保无人值守运行; Disable prompting for non-interactive execution. """ def __init__(self) -> None: if not _HAVE_XSPEC: raise RuntimeError("pyxspec is not available; ensure HEASoft/XSPEC is installed and on PYTHONPATH")
[docs] def compute_K(self, cfg: KConfig) -> Tuple[float, float, float]: # 环境设置 | Set XSPEC globals xspec.Xset.abund = cfg.xspec_abund # type: ignore xspec.Xset.xsect = cfg.xspec_xsect # type: ignore xspec.Xset.cosmo = cfg.xspec_cosmo # type: ignore xspec.Xset.allowPrompting = cfg.allow_prompting # type: ignore # 清空数据/模型 | Reset data/models AllData.clear() AllModels.clear() # 构建模型并设置参数 | Build model and set parameters model = xspec.Model(cfg.model) # type: ignore params = list(cfg.params) p_objs = [] for comp_name in model.componentNames: comp = getattr(model, comp_name) for pname in comp.parameterNames: p_objs.append(getattr(comp, pname)) if len(params) != len(p_objs): raise ValueError(f"model param count mismatch: got {len(params)} but model has {len(p_objs)}") for pobj, val in zip(p_objs, params): pobj.values = float(val) # 调用 fakeit 生成折叠谱 | Run fakeit to create folded spectrum bexpo = cfg.back_exposure if (cfg.back_exposure is not None) else cfg.exposure if cfg.background: fake = FakeitSettings( response=str(cfg.rmf), arf=str(cfg.arf), exposure=str(float(cfg.exposure)), backExposure=str(float(bexpo)), background=str(cfg.background), ) else: fake = FakeitSettings( response=str(cfg.rmf), arf=str(cfg.arf), exposure=str(float(cfg.exposure)), backExposure=str(float(bexpo)), ) AllData.fakeit(1, fake, noWrite=True) # 选择能段并计算带内速率与通量 | Select band and compute rate/flux emin, emax = cfg.band AllData.notice("all") AllData.ignore(f"**-{float(emin)} {float(emax)}-**") AllData.ignore("bad") spec = AllData(1) spec_any = spec # type: ignore[assignment] # spec.rate[3] 通常是模型预测总率(不含噪声) rate = float(spec_any.rate[3]) # type: ignore[attr-defined] xspec.AllModels.calcFlux(f"{float(emin)} {float(emax)}") # spec.flux[0] 通常是 calcFlux 的能通量结果 flux = float(spec_any.flux[0]) # type: ignore[attr-defined] if flux <= 0: raise RuntimeError("XSPEC returned non-positive flux; check model and band") K = float(rate / flux) return K, float(rate), float(flux)
[docs] class XspecKFactory: """ K 计算工厂 + 缓存。 - get_K(...): 返回 K,如缓存未命中则调用 XSPEC 计算并存入缓存。 Return K; compute via XSPEC if cache miss, then store. - get_K_with_values(...): 返回 (K, rate, flux) 元组。 Return (K, rate, flux) tuple. - clear(): 清空缓存。 Clear the cache. """ def __init__(self) -> None: self._cache: Dict[str, Tuple[float, float, float]] = {} # key -> (K, rate, flux) self._session: Optional[XspecSession] = None def _ensure_session(self) -> XspecSession: if self._session is None: self._session = XspecSession() return self._session
[docs] def get_K( self, *, arf: str | Path, rmf: str | Path, background: Optional[str | Path], model: str, params: Tuple[float, ...], band: Tuple[float, float], exposure: Numeric, back_exposure: Optional[Numeric], xspec_abund: str, xspec_xsect: str, xspec_cosmo: str, allow_prompting: bool = False, ) -> float: arf = str(arf) rmf = str(rmf) background_str: Optional[str] = None if background is None else str(background) cfg = KConfig( arf=arf, rmf=rmf, background=background_str, model=model, params=params, band=band, exposure=exposure, back_exposure=back_exposure, xspec_abund=xspec_abund, xspec_xsect=xspec_xsect, xspec_cosmo=xspec_cosmo, allow_prompting=allow_prompting, ) key = cfg.key() if key in self._cache: return self._cache[key][0] # 只返回 K session = self._ensure_session() K, rate, flux = session.compute_K(cfg) self._cache[key] = (float(K), float(rate), float(flux)) return float(K)
[docs] def get_K_with_values( self, *, arf: str | Path, rmf: str | Path, background: Optional[str | Path], model: str, params: Tuple[float, ...], band: Tuple[float, float], exposure: Numeric, back_exposure: Optional[Numeric], xspec_abund: str, xspec_xsect: str, xspec_cosmo: str, allow_prompting: bool = False, ) -> Tuple[float, float, float]: """返回 (K, rate, flux) 元组 参数与 get_K 相同 返回: - K: rate/flux 转化因子 [cts/(erg/cm²)] - rate: 带内计数率 [cts/s] - flux: 带内能通量 [erg/cm²/s] """ arf = str(arf) rmf = str(rmf) background_str: Optional[str] = None if background is None else str(background) cfg = KConfig( arf=arf, rmf=rmf, background=background_str, model=model, params=params, band=band, exposure=exposure, back_exposure=back_exposure, xspec_abund=xspec_abund, xspec_xsect=xspec_xsect, xspec_cosmo=xspec_cosmo, allow_prompting=allow_prompting, ) key = cfg.key() if key in self._cache: return self._cache[key] session = self._ensure_session() K, rate, flux = session.compute_K(cfg) self._cache[key] = (float(K), float(rate), float(flux)) return (float(K), float(rate), float(flux))
[docs] def clear(self) -> None: self._cache.clear()
[docs] def prepare_background_for_fakeit( src_pha: Union[str, Path], bkg_pha: Union[str, Path], out_path: Optional[Union[str, Path]] = None, *, ratio: str = "bkg_over_src", sci_notation: bool = True, ) -> Path: """基于源/背景 PHA 的 BACKSCAL 比值,生成供 fakeit 使用的背景文件。 Create a background PHA for fakeit. The *ratio* parameter controls how the new ``BACKSCAL`` is computed: * ``"bkg_over_src"`` (default): ``BACKSCAL := BACKSCAL_bkg / BACKSCAL_src`` * ``"src_over_bkg"``: ``BACKSCAL := BACKSCAL_src / BACKSCAL_bkg`` 行为 | Behavior - 读取源文件与背景文件的 BACKSCAL;若为列(Type II),取第一行或广播更新。 - 在背景文件的拷贝上,仅更新 BACKSCAL,其余关键字/数据保持不变(附加 HISTORY)。 - 输出文件名默认在原背景文件名后追加 "_for_fakeit",保留原始扩展名(.pha/.pi/.fits/.fit 等),并与原文件位于同一目录。 参数 | Params - src_pha: 源 PHA 路径(Path 或 str)。 - bkg_pha: 背景 PHA 路径(Path 或 str)。 - out_path: 可选输出路径(Path 或 str);不提供时自动拼接后缀。 - ratio: ``"bkg_over_src"`` 或 ``"src_over_bkg"``。 返回 | Returns - 新背景文件的路径(Path)。 """ # 数值到字符串的格式函数(控制注释/HISTORY 中的显示,不影响数值存储) fmt = (lambda x: f"{x:.8E}") if sci_notation else (lambda x: f"{x:.6g}") def _find_spectrum_hdu(hdul: fits.HDUList) -> int: # 优先找 EXTNAME == 'SPECTRUM' 的表;否则返回第一个 BinTableHDU 索引 for i, hdu in enumerate(hdul): extname = str(hdu.header.get('EXTNAME', '')).upper() if isinstance(hdu, fits.BinTableHDU) and extname == 'SPECTRUM': return i for i, hdu in enumerate(hdul): if isinstance(hdu, fits.BinTableHDU): return i raise ValueError('No SPECTRUM (BinTableHDU) found in PHA file') def _read_backscal(hdul: fits.HDUList) -> Tuple[int, float, bool]: idx = _find_spectrum_hdu(hdul) hdu_for_hdr = hdul[idx] hdr = hdu_for_hdr.header # type: ignore[attr-defined] # 1) 标量关键字 if 'BACKSCAL' in hdr: try: val = hdr['BACKSCAL'] return idx, val, False # is_column=False except Exception: pass # 2) 列(Type II 或逐道),取第一行值 hdu = hdul[idx] col_names_raw = list(hdu.columns.names) if isinstance(hdu, fits.BinTableHDU) and hdu.columns is not None and hdu.columns.names is not None else [] col_names = [str(n).strip().upper() for n in col_names_raw] if isinstance(hdu, fits.BinTableHDU) and 'BACKSCAL' in col_names: arr = hdu.data['BACKSCAL'] val = float(np.asarray(arr).flat[0]) return idx, val, True # is_column=True # 3) 缺省 1.0 return idx, 1.0, False src_path = Path(src_pha) bkg_path = Path(bkg_pha) with fits.open(src_path) as src_hdul: _, src_bs, _ = _read_backscal(src_hdul) with fits.open(bkg_path) as bkg_hdul: idx, bkg_bs, is_col = _read_backscal(bkg_hdul) if bkg_bs == 0.0 or src_bs == 0.0: raise ValueError('BACKGROUND or Source region BACKSCAL is zero; cannot compute ratio') if ratio == "bkg_over_src": new_bs = float(bkg_bs / src_bs) elif ratio == "src_over_bkg": new_bs = float(src_bs / bkg_bs) else: raise ValueError( f"Unknown ratio '{ratio}'; expected 'bkg_over_src' or 'src_over_bkg'" ) # 生成输出路径(同目录,保留扩展名);若 out_path 指向目录则写入其下 if out_path is None: out_path_path = bkg_path.with_name(bkg_path.stem + '_for_fakeit' + bkg_path.suffix) else: p = Path(out_path) if p.exists() and p.is_dir(): out_path_path = p / (bkg_path.stem + '_for_fakeit' + bkg_path.suffix) else: out_path_path = p # 先将原背景文件原样写到目标路径 bkg_hdul.writeto(out_path_path, overwrite=True) # 以更新模式打开目标文件并原地修改 BACKSCAL,避免复制可变长列 with fits.open(out_path_path, mode='update') as out_hdul: # 在写入后的文件上重新定位 SPECTRUM HDU out_idx = _find_spectrum_hdu(out_hdul) hdu_copy = out_hdul[out_idx] col_names2 = list(hdu_copy.columns.names) if isinstance(hdu_copy, fits.BinTableHDU) and hdu_copy.columns is not None and hdu_copy.columns.names is not None else [] if isinstance(hdu_copy, fits.BinTableHDU) and is_col and 'BACKSCAL' in col_names2 and hdu_copy.data is not None: arr = np.asarray(hdu_copy.data['BACKSCAL']) # 根据列的物理格式选择精度(E->float32, D->float64),保持与原列一致 try: icol0 = [i for i, n in enumerate(col_names2) if str(n).strip().upper() == 'BACKSCAL'][0] fmt_str = str(hdu_copy.columns[icol0].format).strip().upper() if hdu_copy.columns is not None else '' if fmt_str.startswith('E'): dtype_target = np.float32 elif fmt_str.startswith('D'): dtype_target = np.float64 else: dtype_target = arr.dtype except Exception: dtype_target = arr.dtype # 原地覆盖列数据(数值存储为二进制浮点,显示由 TDISP 控制);保持形状不变 hdu_copy.data['BACKSCAL'][:] = np.full(arr.shape, new_bs, dtype=dtype_target) # 额外:同步写 header BACKSCAL,便于工具显示 hdu_copy.header['BACKSCAL'] = (float(new_bs), f"adjusted for fakeit: bkg/src BACKSCAL = {fmt(new_bs)}") # type: ignore[attr-defined] # 设定列显示格式,提升 FV 可读性 try: icol = [i for i, n in enumerate(col_names2) if str(n).strip().upper() == 'BACKSCAL'][0] + 1 disp = 'E12.5' if sci_notation else 'F12.6' hdu_copy.header[f'TDISP{icol}'] = (disp, 'display format for BACKSCAL') # type: ignore[attr-defined] except Exception: pass else: # 标量关键字 | scalar keyword hdu_copy.header['BACKSCAL'] = (new_bs, f"adjusted for fakeit: bkg/src BACKSCAL = {fmt(new_bs)}") # type: ignore[attr-defined] # 附加 HISTORY | add history hdu_copy.header.add_history( f"BACKSCAL adjusted for fakeit: new={fmt(new_bs)} (bkg/src = {fmt(bkg_bs)} / {fmt(src_bs)} = {fmt(new_bs)})" ) # type: ignore[attr-defined] # flush 在上下文退出时自动完成 return out_path_path
__all__ = [ 'KConfig', 'XspecSession', 'XspecKFactory', 'prepare_background_for_fakeit', ]