Source code for jinwu.background.backprior


from __future__ import annotations

from dataclasses import dataclass
from typing import Tuple, Optional, Literal
import numpy as np
from astropy.io import fits

[docs] @dataclass class BackgroundPrior: """ Background prior based on OFF-region total counts over a known exposure. Attributes ---------- n_off_prior : float Total counts observed in the OFF region during t_off seconds. t_off : float Exposure time in the OFF region corresponding to n_off_prior (seconds). area_ratio : float A_on / A_off. The Li&Ma alpha uses alpha = area_ratio * (t_on / t_off). """ n_off_prior: float t_off: float area_ratio: float
[docs] def alpha(self, t_on: float) -> float: return self.area_ratio * (float(t_on) / float(self.t_off))
[docs] @classmethod def from_epwxt_background( cls, fits_path: str, *, chan_lo: int = 53, chan_hi: int = 506, arf_path: Optional[str] = None, arf_bin_lo: int = 81, arf_bin_hi: int = 780, pixel_size_mm: float = 0.015, pixel_fov_deg: float = 0.00229, src_radius_arcmin: float = 9.0, bkg_rin_arcmin: float = 18.0, bkg_rout_arcmin: float = 36.0, t_off: float = 1_000_000.0, verbose: bool = True, ) -> "BackgroundPrior": """ Construct BackgroundPrior from a single EP/WXT background FITS file that contains counts per cm^2 accumulated over a reference exposure (typically 1e6 s). This enforces the energy band via detector channels [chan_lo, chan_hi] (default 53–506, about 0.5–4.0 keV), computes the OFF-region prior counts in a desired t_off, and derives area_ratio from instrument geometry. Parameters ---------- fits_path : str Path to the background FITS (counts per cm^2). In practice only one combined background file is available. chan_lo, chan_hi : int Inclusive channel bounds to approximate 0.5–4.0 keV. pixel_size_mm : float Pixel size on detector (mm). Default 0.015 mm. pixel_fov_deg : float Sky angular size per pixel (degree). Default 0.00229 deg. src_radius_arcmin : float Source region radius (arcmin). Default 9'. bkg_rin_arcmin, bkg_rout_arcmin : float Background annulus inner/outer radii (arcmin). Default 18'–36'. t_off : float OFF prior exposure (s) for Li&Ma. Default 1e6 s (matches FITS by default). verbose : bool Print brief diagnostics. Returns ------- BackgroundPrior """ # If ARF is provided, derive the energy band from ARF bins [arf_bin_lo, arf_bin_hi] band_keV: Optional[Tuple[float, float]] = None if arf_path is not None: with fits.open(arf_path) as arf: if 'SPECRESP' not in arf: raise ValueError(f"ARF file {arf_path} lacks SPECRESP extension.") hdu_sp = arf['SPECRESP'] arfd = getattr(hdu_sp, 'data', None) if arfd is None: raise ValueError(f"ARF file {arf_path} SPECRESP has no data") elo = np.asarray(arfd['ENERG_LO'], dtype=float) ehi = np.asarray(arfd['ENERG_HI'], dtype=float) # Assume user-provided ARF bin indices are 1-based (第81个…第780个) i0 = max(0, int(arf_bin_lo) - 1) i1 = min(elo.size - 1, int(arf_bin_hi) - 1) if i1 < i0: raise ValueError("arf_bin_hi must be >= arf_bin_lo") band_keV = (float(np.min(elo[i0:i1 + 1])), float(np.max(ehi[i0:i1 + 1]))) if verbose: print(f"[BackgroundPrior] ARF bins {arf_bin_lo}-{arf_bin_hi} -> band ~{band_keV[0]:.3g}-{band_keV[1]:.3g} keV") def _sum_counts_per_cm2(path: str, lo: int, hi: int, band: Optional[Tuple[float, float]]) -> Tuple[float, float]: with fits.open(path) as hdul: # Try SPECTRUM extension (OGIP PHA) if 'SPECTRUM' not in hdul: raise ValueError(f"FITS file {path} lacks SPECTRUM extension.") hdu_spec = hdul['SPECTRUM'] spec = getattr(hdu_spec, 'data', None) hdr = getattr(hdu_spec, 'header', None) if spec is None or hdr is None: raise ValueError(f"FITS SPECTRUM in {path} missing data/header") channels = np.asarray(spec['CHANNEL'], dtype=int) mask = (channels >= lo) & (channels <= hi) # If energy bounds are available and ARF-derived band is given, prefer energy overlap if ('EBOUNDS' in hdul) and (band is not None): hdu_eb = hdul['EBOUNDS'] eb = getattr(hdu_eb, 'data', None) if eb is not None: elo = np.asarray(eb['E_MIN'], dtype=float) ehi = np.asarray(eb['E_MAX'], dtype=float) mask = (ehi > band[0]) & (elo < band[1]) counts_cm2 = float(np.sum(spec['COUNTS'][mask])) texp = float(hdr.get('EXPOSURE', 1_000_000.0)) # Optional EBOUNDS cross-check if 'EBOUNDS' in hdul: hdu_eb = hdul['EBOUNDS'] eb = getattr(hdu_eb, 'data', None) if eb is not None and verbose: elo = np.asarray(eb['E_MIN'], dtype=float) ehi = np.asarray(eb['E_MAX'], dtype=float) if band is None: ch = np.asarray(eb['CHANNEL'], dtype=int) sel = (ch >= lo) & (ch <= hi) emin = float(np.min(elo[sel])) if np.any(sel) else np.nan emax = float(np.max(ehi[sel])) if np.any(sel) else np.nan print(f"[BackgroundPrior] Channel {lo}-{hi} spans ~{emin:.3g}-{emax:.3g} keV (EBOUNDS)") else: print(f"[BackgroundPrior] Using ARF-derived band ~{band[0]:.3g}-{band[1]:.3g} keV for background selection") return counts_cm2, texp c_tot_cm2, t_ref = _sum_counts_per_cm2(fits_path, chan_lo, chan_hi, band_keV) # Geometry: map sky regions to detector cm^2 via pixelization pix_size_cm = float(pixel_size_mm) * 0.1 # mm -> cm apix_cm2 = pix_size_cm ** 2 omega_pix_deg2 = float(pixel_fov_deg) ** 2 # Sky areas r_on_deg = float(src_radius_arcmin) / 60.0 r_in_deg = float(bkg_rin_arcmin) / 60.0 r_out_deg = float(bkg_rout_arcmin) / 60.0 A_on_sky_deg2 = float(np.pi * (r_on_deg ** 2)) A_off_sky_deg2 = float(np.pi * (r_out_deg ** 2 - r_in_deg ** 2)) # Pixel counts and detector cm^2 areas Npix_on = A_on_sky_deg2 / omega_pix_deg2 Npix_off = A_off_sky_deg2 / omega_pix_deg2 A_on_cm2 = Npix_on * apix_cm2 A_off_cm2 = Npix_off * apix_cm2 area_ratio = float(A_on_cm2 / A_off_cm2) if verbose: print(f"[BackgroundPrior] A_on_cm2={A_on_cm2:.4g}, A_off_cm2={A_off_cm2:.4g}, area_ratio~{area_ratio:.6g}") # Convert counts-per-cm2 over t_ref to rate-per-cm2 r_bg_cm2 = c_tot_cm2 / t_ref # counts/(s·cm^2) r_off = r_bg_cm2 * A_off_cm2 # counts/s in OFF annulus n_off_prior = r_off * float(t_off) if verbose: print(f"[BackgroundPrior] t_ref={t_ref:g}s, t_off={t_off:g}s, n_off_prior={n_off_prior:.6g}") return cls(n_off_prior=float(n_off_prior), t_off=float(t_off), area_ratio=float(area_ratio))
[docs] @classmethod def from_epwxt_background_default( cls, *, chan_lo: int = 53, chan_hi: int = 506, t_off: float = 1_000_000.0, verbose: bool = True, ) -> "BackgroundPrior": """ Return the ready-made BackgroundPrior values as requested by user. Ignores file IO and outputs fixed values for reproducibility of quick tests. """ if verbose: print("[BackgroundPrior] Using fixed default prior: " "n_off_prior=104016.95348743448, t_off=1000000.0, area_ratio=0.08333333333333333") return cls( n_off_prior=104016.95348743448, t_off=1_000_000.0, area_ratio=0.08333333333333333, )
[docs] @dataclass class BackgroundCountsPosterior: """ 背景区域计数后验(聚合到选择能段之后)。 基于 Gamma-Poisson 共轭:以 OFF 区域“计数率” λ_off ~ Gamma(a_total, b) (shape=a_total, rate=b),则: - OFF 区域在曝光 t 的后验预测 n_off ~ Poisson-Gamma 混合,等价于 NB(r=a_total, p=b/(b+t)); - ON 区域在曝光 t 的后验预测 n_on,bkg ~ NB(r=a_total, p=b/(b+area_ratio*t)), 其中 area_ratio=A_on/A_off。 我们通过 Gamma-Poisson 的层次采样实现采样(避免不同库对 NB 参数化差异)。 属性 ------ a_total : float 选择能段内所有能道的先验 shape 之和(更新后为后验 shape)。 b : float 先验/后验的 rate 参数(单位:秒),通常为 t_ref + sum(t_obs_off)。 area_ratio : float A_on / A_off,用于把 OFF 计数率映射到 ON 背景计数率。 """ a_total: float b: float area_ratio: float # ---- 期望值 ----
[docs] def expected_off(self, t: float) -> float: return float(self.a_total) * float(t) / float(self.b)
[docs] def expected_on(self, t: float) -> float: return float(self.a_total) * float(self.area_ratio) * float(t) / float(self.b)
# ---- 采样器(后验预测)---- def _sample_lambda_off(self, size: int = 1, rng: Optional[np.random.Generator] = None) -> np.ndarray: if rng is None: rng = np.random.default_rng() # Gamma(shape=a, rate=b) -> numpy 用 scale=1/rate;当 a 非正时退化为 0 a = float(self.a_total) if a <= 0: return np.zeros(size, dtype=float) return rng.gamma(shape=a, scale=1.0 / float(self.b), size=size)
[docs] def sample_off(self, t: float, size: int = 1, rng: Optional[np.random.Generator] = None) -> np.ndarray: """采样 OFF 区域在曝光 t 的后验预测计数。""" if rng is None: rng = np.random.default_rng() lam = self._sample_lambda_off(size=size, rng=rng) return rng.poisson(lam * float(t))
[docs] def sample_on(self, t: float, size: int = 1, rng: Optional[np.random.Generator] = None) -> np.ndarray: """采样 ON 区域(仅背景)在曝光 t 的后验预测计数。""" if rng is None: rng = np.random.default_rng() lam = self._sample_lambda_off(size=size, rng=rng) return rng.poisson(lam * float(self.area_ratio) * float(t))
# ---- 增量式共轭更新 ----
[docs] def update_with_off_counts(self, n_off: float, t_off: float, *, inplace: bool = False) -> "BackgroundCountsPosterior": """用额外的 OFF 观测(总计数与曝光)进行一次共轭更新。""" a_new = float(self.a_total) + float(n_off) b_new = float(self.b) + float(t_off) if inplace: self.a_total = a_new self.b = b_new return self return BackgroundCountsPosterior(a_total=a_new, b=b_new, area_ratio=float(self.area_ratio))
[docs] def update_with_on_bg_counts(self, n_on_bg: float, t_on_bg: float, *, inplace: bool = False) -> "BackgroundCountsPosterior": """用额外的 ON 背景-only 观测进行更新(等效 OFF 曝光为 area_ratio*t_on_bg)。""" a_new = float(self.a_total) + float(n_on_bg) b_new = float(self.b) + float(self.area_ratio) * float(t_on_bg) if inplace: self.a_total = a_new self.b = b_new return self return BackgroundCountsPosterior(a_total=a_new, b=b_new, area_ratio=float(self.area_ratio))
[docs] @dataclass class BackgroundSpectralPrior: """ 考虑能谱的背景先验:对所选能道逐道建立 Gamma 先验 λ_k ~ Gamma(a0_k, b0), 其中 b0=t_ref(FITS 中的总参考曝光),a0_k 是把“计/厘米^2”乘以 OFF 区域有效面积后、 在 t_ref 内的 OFF 区域先验计数(可理解为先验的等效观测计数)。 该类可使用 OFF 区域的观测谱做共轭更新,并聚合为 BackgroundCountsPosterior, 用于对 ON/OFF 的总计数进行后验预测采样或期望计算。 字段 ----- a0 : np.ndarray (n_chan,) 每个选择能道的先验 shape。 b0 : float 先验 rate(秒),通常为背景文件中的 EXPOSURE(如 1e6 s)。 area_ratio : float A_on / A_off。 channels : np.ndarray (n_chan,) 参与先验的 CHANNEL 索引集合(用于与观测谱对齐/筛选)。 """ a0: np.ndarray b0: float area_ratio: float channels: np.ndarray # ---------- 构造:来自 EP/WXT 背景 FITS(计/厘米^2 over t_ref) ----------
[docs] @classmethod def from_epwxt_background( cls, fits_path: str, *, chan_lo: int = 53, chan_hi: int = 506, arf_path: Optional[str] = None, arf_bin_lo: int = 81, arf_bin_hi: int = 780, pixel_size_mm: float = 0.015, pixel_fov_deg: float = 0.00229, src_radius_arcmin: float = 9.0, bkg_rin_arcmin: float = 18.0, bkg_rout_arcmin: float = 36.0, verbose: bool = True, ) -> "BackgroundSpectralPrior": """ 读取单个 EP/WXT 背景 FITS(SPECTRUM: COUNTS 为“计/厘米^2”)并按能团形成逐道先验。 若提供 ARF,则用 SPECRESP 的第 [arf_bin_lo, arf_bin_hi](1-based)能道推断能段, 并用 EBOUNDS 做能段重合筛选;否则回退到 CHANNEL 区间 [chan_lo, chan_hi]。 OFF/ON 区域面积由像元尺寸及 FoV 计算得到,area_ratio=A_on/A_off。 """ # 1) 可选:由 ARF 推断能段 band_keV: Optional[Tuple[float, float]] = None if arf_path is not None: with fits.open(arf_path) as arf: if 'SPECRESP' not in arf: raise ValueError(f"ARF file {arf_path} lacks SPECRESP extension.") hdu_sp = arf['SPECRESP'] arfd = getattr(hdu_sp, 'data', None) if arfd is None: raise ValueError(f"ARF file {arf_path} SPECRESP has no data") elo = np.asarray(arfd['ENERG_LO'], dtype=float) ehi = np.asarray(arfd['ENERG_HI'], dtype=float) i0 = max(0, int(arf_bin_lo) - 1) i1 = min(elo.size - 1, int(arf_bin_hi) - 1) if i1 < i0: raise ValueError("arf_bin_hi must be >= arf_bin_lo") band_keV = (float(np.min(elo[i0:i1+1])), float(np.max(ehi[i0:i1+1]))) if verbose: print(f"[BackSpecPrior] ARF bins {arf_bin_lo}-{arf_bin_hi} -> band ~{band_keV[0]:.3g}-{band_keV[1]:.3g} keV") # 2) 读取背景谱,形成能道筛选和逐道计数/厘米^2 with fits.open(fits_path) as hdul: if 'SPECTRUM' not in hdul: raise ValueError(f"FITS file {fits_path} lacks SPECTRUM extension.") hdu_spec = hdul['SPECTRUM'] spec = getattr(hdu_spec, 'data', None) hdr = getattr(hdu_spec, 'header', None) if spec is None or hdr is None: raise ValueError(f"FITS SPECTRUM in {fits_path} missing data/header") ch = np.asarray(spec['CHANNEL'], dtype=int) mask = (ch >= int(chan_lo)) & (ch <= int(chan_hi)) if ('EBOUNDS' in hdul) and (band_keV is not None): eb = getattr(hdul['EBOUNDS'], 'data', None) if eb is not None: elo = np.asarray(eb['E_MIN'], dtype=float) ehi = np.asarray(eb['E_MAX'], dtype=float) mask = (ehi > band_keV[0]) & (elo < band_keV[1]) channels = ch[mask] counts_cm2 = np.asarray(spec['COUNTS'], dtype=float)[mask] t_ref = float(hdr.get('EXPOSURE', 1_000_000.0)) if ('EBOUNDS' in hdul) and verbose: eb = getattr(hdul['EBOUNDS'], 'data', None) if eb is not None: elo = np.asarray(eb['E_MIN'], dtype=float) ehi = np.asarray(eb['E_MAX'], dtype=float) if band_keV is None: sel = (eb['CHANNEL'] >= int(chan_lo)) & (eb['CHANNEL'] <= int(chan_hi)) emin = float(np.min(elo[sel])) if np.any(sel) else np.nan emax = float(np.max(ehi[sel])) if np.any(sel) else np.nan print(f"[BackSpecPrior] Channel {chan_lo}-{chan_hi} ~{emin:.3g}-{emax:.3g} keV") else: print(f"[BackSpecPrior] Using ARF-derived band ~{band_keV[0]:.3g}-{band_keV[1]:.3g} keV") # 3) 几何:计算 A_on, A_off 与 area_ratio pix_size_cm = float(pixel_size_mm) * 0.1 apix_cm2 = pix_size_cm ** 2 omega_pix_deg2 = float(pixel_fov_deg) ** 2 r_on_deg = float(src_radius_arcmin) / 60.0 r_in_deg = float(bkg_rin_arcmin) / 60.0 r_out_deg = float(bkg_rout_arcmin) / 60.0 A_on_sky_deg2 = float(np.pi * (r_on_deg ** 2)) A_off_sky_deg2 = float(np.pi * (r_out_deg ** 2 - r_in_deg ** 2)) Npix_on = A_on_sky_deg2 / omega_pix_deg2 Npix_off = A_off_sky_deg2 / omega_pix_deg2 A_on_cm2 = Npix_on * apix_cm2 A_off_cm2 = Npix_off * apix_cm2 area_ratio = float(A_on_cm2 / A_off_cm2) if verbose: print(f"[BackSpecPrior] A_on_cm2={A_on_cm2:.4g}, A_off_cm2={A_off_cm2:.4g}, area_ratio~{area_ratio:.6g}") # 4) 逐道先验 a0_k:把“计/厘米^2”换算为 OFF 区域在 t_ref 内的等效计数 # Gamma 的形参可取为 a0_k = n_off_k_ref,rate=b0=t_ref n_off_k_ref = counts_cm2 * A_off_cm2 a0 = n_off_k_ref.astype(float) b0 = float(t_ref) if verbose: print(f"[BackSpecPrior] t_ref={t_ref:g}s, sum(a0)={np.sum(a0):.6g}") return cls(a0=a0, b0=b0, area_ratio=area_ratio, channels=channels)
# ---------- 更新:使用 OFF 区域观测谱(PHA) ----------
[docs] def update_with_off_spectrum( self, pha_path: str, *, chan_lo: Optional[int] = None, chan_hi: Optional[int] = None, use_ebounds: bool = True, verbose: bool = True, ) -> BackgroundCountsPosterior: """ 用 OFF 区域的观测谱(PHA,COUNTS 为整数计数)进行共轭更新,聚合为总计数后验。 - 若 use_ebounds=True,优先用 EBOUNDS 与 self.channels 对齐;否则按 CHANNEL 匹配。 - 曝光时间从 PHA 的 SPECTRUM header['EXPOSURE'] 读取。 返回 BackgroundCountsPosterior,参数 a_total=sum(a0)+sum(n_off_obs),b=b0+t_off_obs。 """ with fits.open(pha_path) as hdul: if 'SPECTRUM' not in hdul: raise ValueError(f"FITS file {pha_path} lacks SPECTRUM extension.") hdu = hdul['SPECTRUM'] data = getattr(hdu, 'data', None) hdr = getattr(hdu, 'header', None) if data is None or hdr is None: raise ValueError(f"FITS SPECTRUM in {pha_path} missing data/header") ch = np.asarray(data['CHANNEL'], dtype=int) if use_ebounds and ('EBOUNDS' in hdul): # 与 prior 使用的能道集合对齐(基于 CHANNEL 即可)。 sel = np.isin(ch, self.channels) else: lo = int(chan_lo) if chan_lo is not None else int(np.min(self.channels)) hi = int(chan_hi) if chan_hi is not None else int(np.max(self.channels)) sel = (ch >= lo) & (ch <= hi) & np.isin(ch, self.channels) n_off_obs = np.asarray(data['COUNTS'], dtype=float)[sel] t_off_obs = float(hdr.get('EXPOSURE', 0.0)) if t_off_obs <= 0: raise ValueError("Observation PHA has non-positive EXPOSURE") a_total = float(np.sum(self.a0)) + float(np.sum(n_off_obs)) b = float(self.b0 + t_off_obs) if verbose: print(f"[BackSpecPrior] OFF update: t_obs={t_off_obs:g}s, add_counts={np.sum(n_off_obs):.6g}, a_total={a_total:.6g}, b={b:.6g}") return BackgroundCountsPosterior(a_total=a_total, b=b, area_ratio=float(self.area_ratio))
[docs] def update_with_on_bg_spectrum( self, pha_path: str, *, chan_lo: Optional[int] = None, chan_hi: Optional[int] = None, use_ebounds: bool = True, verbose: bool = True, ) -> BackgroundCountsPosterior: """ 使用 ON 区域“背景-only”时段的观测谱进行更新: - shape 增加 sum(n_on_obs) - rate 增加 area_ratio * t_on_obs 返回聚合后的 BackgroundCountsPosterior。 """ with fits.open(pha_path) as hdul: if 'SPECTRUM' not in hdul: raise ValueError(f"FITS file {pha_path} lacks SPECTRUM extension.") hdu = hdul['SPECTRUM'] data = getattr(hdu, 'data', None) hdr = getattr(hdu, 'header', None) if data is None or hdr is None: raise ValueError(f"FITS SPECTRUM in {pha_path} missing data/header") ch = np.asarray(data['CHANNEL'], dtype=int) if use_ebounds and ('EBOUNDS' in hdul): sel = np.isin(ch, self.channels) else: lo = int(chan_lo) if chan_lo is not None else int(np.min(self.channels)) hi = int(chan_hi) if chan_hi is not None else int(np.max(self.channels)) sel = (ch >= lo) & (ch <= hi) & np.isin(ch, self.channels) n_on_obs = np.asarray(data['COUNTS'], dtype=float)[sel] t_on_obs = float(hdr.get('EXPOSURE', 0.0)) if t_on_obs <= 0: raise ValueError("Observation PHA has non-positive EXPOSURE") a_total = float(np.sum(self.a0)) + float(np.sum(n_on_obs)) b = float(self.b0 + self.area_ratio * t_on_obs) if verbose: print(f"[BackSpecPrior] ON-bg update: t_on={t_on_obs:g}s, add_counts={np.sum(n_on_obs):.6g}, a_total={a_total:.6g}, b={b:.6g}") return BackgroundCountsPosterior(a_total=a_total, b=b, area_ratio=float(self.area_ratio))
# ---------- 快速聚合:不更新,直接返回基于先验的后验(等价于 a_total=sum(a0), b=b0) ----------
[docs] def as_counts_posterior(self) -> BackgroundCountsPosterior: return BackgroundCountsPosterior(a_total=float(np.sum(self.a0)), b=float(self.b0), area_ratio=float(self.area_ratio))