Source code for jinwu.core.ogip

"""OGIP 标准基类与验证框架 (初版)

涵盖:
- 通用 FITS 基类 OgipFitsBase(统一 path/header/meta/headers_dump)
- 光变/事件 (OGIP-93-003) 基类 OgipTimeSeriesBase
- PHA 能谱 (OGIP-92-007 / 007a) 基类 OgipSpectrumBase
- 响应 (CAL/GEN/92-002: RMF/ARF) 基类 OgipResponseBase

提供统一的 validate() 机制:
- 头关键字必需/可选检查
- 表格列名必需/可选检查
- 结果分级:ERROR / WARN / INFO

后续可扩展:
- 更细粒度的值域校验 (如 EXPOSURE>0, CHANNEL 单调递增等)
- 与具体 OGIP 文档的章节引用

English summary
---------------
OGIP base class & validation scaffold covering time series, spectra, and response files.
Unified validate() returning structured report with severity levels.
"""
from __future__ import annotations

from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Any, Optional, List, Sequence

__all__ = [
    "ValidationMessage", "ValidationReport",
    "OgipFitsBase", "OgipTimeSeriesBase", "OgipSpectrumBase", "OgipResponseBase"
]

# ---------------- Validation Data Structures ----------------

@dataclass(slots=True)
class ValidationMessage:
    level: str  # 'ERROR' | 'WARN' | 'INFO'
    code: str   # short symbolic code, e.g. MISSING_KEY
    message: str

@dataclass(slots=True)
class ValidationReport:
    kind: str
    path: Path
    ok: bool
    messages: List[ValidationMessage] = field(default_factory=list)

    def add(self, level: str, code: str, message: str):
        self.messages.append(ValidationMessage(level=level, code=code, message=message))

    def errors(self) -> List[ValidationMessage]:
        return [m for m in self.messages if m.level == 'ERROR']

    def warnings(self) -> List[ValidationMessage]:
        return [m for m in self.messages if m.level == 'WARN']

# ---------------- Base FITS Class ----------------

@dataclass(slots=True)
class OgipFitsBase:
    path: Path
    header: Dict[str, Any]
    meta: Any  # OgipMeta (延迟导入避免循环)
    headers_dump: Any  # FitsHeaderDump
    _validation: Optional[ValidationReport] = field(default=None, init=False, repr=False, compare=False)

    def validate(self) -> ValidationReport:
        """通用层:仅检查路径与 header 存在性。子类会扩展。"""
        rpt = ValidationReport(kind=self.__class__.__name__, path=self.path, ok=True)
        if not self.path.exists():
            rpt.add('ERROR', 'FILE_NOT_FOUND', f"File not found: {self.path}")
        if self.header is None:
            rpt.add('ERROR', 'NO_HEADER', 'Primary header missing')
        rpt.ok = len(rpt.errors()) == 0
        self._validation = rpt
        return rpt

    def _has_key_ci(self, key: str) -> bool:
        return self.get_keyword_ci(key, default=None) is not None

    def _has_any_key_ci(self, keys: Sequence[str]) -> bool:
        return any(self._has_key_ci(k) for k in keys)

    def get_keyword_ci(self, key: str, default: Optional[Any] = None) -> Any:
        """大小写不敏感地从 header 中读取关键字(若 header 为 dict).

        返回关键字值或提供的 default。便于统一处理 FITS 关键字的大小写差异。
        """
        if self.header is None:
            return default
        # header 可能是 astropy Header(支持 __contains__ case-insensitive),
        # 但通常我们将其传为 dict;先尝试直接查找,再按大写匹配。
        try:
            if key in self.header:
                return self.header[key]
        except Exception:
            pass
        # case-insensitive match
        up = key.upper()
        for k, v in dict(self.header).items():
            try:
                if str(k).upper() == up:
                    return v
            except Exception:
                continue
        return default

    @property
    def validation(self) -> Optional[ValidationReport]:
        return self._validation

# ---------------- Specialized Base Classes ----------------

@dataclass(slots=True)
class OgipTimeSeriesBase(OgipFitsBase):
    """OGIP-93-003 风格的时间序列 (光变或事件) 基类。

    子类需提供:columns (Sequence[str]) 用于验证列名。
    """

    # 根据 OGIP-94-003 (Events) 的建议/要求,时间序列至少应包含时间系统与时间单位
    REQUIRED_KEYS = ["TELESCOP", "INSTRUME", "TIMESYS", "TIMEUNIT"]
    CRITICAL_KEYS = ["TIMESYS", "TIMEUNIT"]
    OPTIONAL_KEYS = ["OBJECT", "OBS_ID", "MJDREF", "MJDREFI", "MJDREFF", "TIMEZERO", "TREFPOS", "DATE-OBS"]
    REQUIRED_COLUMNS_ANY = [["TIME"]]  # 至少包含 TIME

    def validate(self) -> ValidationReport:
        rpt = super().validate()
        # Header keyword checks
        for k in self.REQUIRED_KEYS:
            if not self._has_key_ci(k):
                lvl = 'ERROR' if k in self.CRITICAL_KEYS else 'WARN'
                rpt.add(lvl, 'MISSING_KEY', f"Required key '{k}' not found (OGIP-93-003).")
        # Column checks
        cols = getattr(self, 'columns', ()) or ()
        colset = {c.upper() for c in cols}
        for group in self.REQUIRED_COLUMNS_ANY:
            if not any(c.upper() in colset for c in group):
                rpt.add('ERROR', 'MISSING_COLUMN', f"Missing required column group: {group}")
        # MJDREF 合并检查(MJDREFI+MJDREFF 或 MJDREF 之一应存在)
        if not (self._has_key_ci('MJDREF') or self._has_any_key_ci(['MJDREFI', 'MJDREFF'])):
            rpt.add('WARN', 'MISSING_MJDREF', 'MJDREF / (MJDREFI+MJDREFF) not found in header; absolute times may be ambiguous.')
        # TIMEUNIT/TIMESYS presence already warned above; optionally validate common values
        timeunit = None
        try:
            timeunit = self.get_keyword_ci('TIMEUNIT')
        except Exception:
            timeunit = (self.header or {}).get('TIMEUNIT')
        if timeunit is not None and str(timeunit).upper() not in ('S', 'SEC', 'SECOND', 'SECONDS'):
            # not an error, but note uncommon units
            rpt.add('INFO', 'UNUSUAL_TIMEUNIT', f"TIMEUNIT='{timeunit}'")
        # GTI: many EVENTS files include a GTI extension; subclasses or readers should populate a gti field
        # Provide an extension point: subclasses/readers can implement `extract_gti(hdul)` to fill gti.
        rpt.ok = len(rpt.errors()) == 0
        self._validation = rpt
        return rpt

    def extract_gti(self, hdul: Any) -> Optional[list]:
        """Extension point: parse GTI from an opened `HDUList` and return list of (start, stop) tuples.

        Default implementation searches for an extension named 'GTI' (case-insensitive) and, if
        present, returns a list of (START, STOP) pairs. Readers should call this to populate event
        objects' GTI field.
        """
        if hdul is None:
            return None
        for hdu in hdul:
            hdr = getattr(hdu, 'header', {})
            name = (hdr.get('EXTNAME') or '').upper() if hdr else ''
            if name == 'GTI' or getattr(hdu, 'name', '').upper() == 'GTI':
                data = getattr(hdu, 'data', None)
                if data is None:
                    return None
                cols = getattr(data, 'columns', None)
                colnames = [n.upper() for n in (cols.names if cols is not None else [])]
                start_col = None
                stop_col = None
                for n in ['START', 'TSTART']:
                    if n in colnames:
                        start_col = n
                        break
                for n in ['STOP', 'TSTOP']:
                    if n in colnames:
                        stop_col = n
                        break
                if start_col and stop_col:
                    arr_start = data[start_col]
                    arr_stop = data[stop_col]
                    try:
                        return [(float(s), float(e)) for s, e in zip(arr_start, arr_stop)]
                    except Exception:
                        return None
        return None

@dataclass(slots=True)
class OgipSpectrumBase(OgipFitsBase):
    """OGIP-92-007 / 007a PHA 能谱基类。"""

    REQUIRED_KEYS = ["TELESCOP", "INSTRUME", "CHANTYPE"]
    OPTIONAL_KEYS = ["FILTER", "EXPOSURE", "BACKSCAL", "AREASCAL", "RESPFILE", "ANCRFILE", "CORRFILE", "CORRSCAL"]
    REQUIRED_COLUMNS = ["CHANNEL"]
    OPTIONAL_RATE_COLUMNS = ["COUNTS", "RATE"]

    def validate(self) -> ValidationReport:
        rpt = super().validate()
        for k in self.REQUIRED_KEYS:
            if not self._has_key_ci(k):
                rpt.add('WARN', 'MISSING_KEY', f"Required key '{k}' not found (OGIP-92-007).")
        cols = getattr(self, 'columns', ()) or ()
        colset = {c.upper() for c in cols}
        for c in self.REQUIRED_COLUMNS:
            if c.upper() not in colset:
                rpt.add('ERROR', 'MISSING_COLUMN', f"Missing required column '{c}' for PHA.")
        if not any(c in colset for c in self.OPTIONAL_RATE_COLUMNS):
            rpt.add('ERROR', 'MISSING_COLUMN', "Missing required PHA value column: one of ['COUNTS', 'RATE']")
        # Basic sanity: exposure>0 if present
        exp_val = self.get_keyword_ci('EXPOSURE', self.get_keyword_ci('EXPTIME', None))
        if exp_val is not None:
            try:
                if float(exp_val) <= 0:
                    rpt.add('WARN', 'BAD_EXPOSURE', f"Non-positive exposure value: {exp_val}")
            except Exception:
                rpt.add('WARN', 'BAD_EXPOSURE', f"Exposure not numeric: {exp_val}")
        rpt.ok = len(rpt.errors()) == 0
        self._validation = rpt
        return rpt

@dataclass(slots=True)
class OgipResponseBase(OgipFitsBase):
    """CAL/GEN/92-002 响应 (ARF/RMF) 基类。"""

    REQUIRED_KEYS_ANY = [["TELESCOP"], ["INSTRUME"], ["DETNAM", "DETNAME"]]
    REQUIRED_COLUMNS_ARF = ["ENERG_LO", "ENERG_HI", "SPECRESP"]
    REQUIRED_COLUMNS_RMF_MIN = ["ENERG_LO", "ENERG_HI", "MATRIX"]  # 简化

    def validate(self) -> ValidationReport:
        rpt = super().validate()
        # Header presence: at least one from each group
        for group in self.REQUIRED_KEYS_ANY:
            if not self._has_any_key_ci(group):
                rpt.add('WARN', 'MISSING_KEY', f"Missing one of required keys {group} (CAL/GEN/92-002).")
        # Column checks will be done in concrete subclasses where we know type
        rpt.ok = len(rpt.errors()) == 0
        self._validation = rpt
        return rpt

# 具体数据类将继承上述基类并在自身 validate() 中补充列检查。