from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, Literal, Optional, Union, cast, overload
import numpy as np
from astropy.io import fits
from .base import ChannelBand, EnergyBand, FitsHeaderDump, HduHeader, OgipMeta, RegionArea
from .data import ArfData, EventData, LightcurveData, PhaData, RmfData
from .ogip import ValidationReport
[docs]
def band_from_arf_bins(arf_path: str | Path, bin_lo: int = 81, bin_hi: int = 780) -> EnergyBand:
with fits.open(arf_path) as h:
hd = cast(Any, h["SPECRESP"])
d = hd.data
elo = np.asarray(d["ENERG_LO"], float)
ehi = np.asarray(d["ENERG_HI"], float)
i0 = max(0, int(bin_lo) - 1)
i1 = min(ehi.size - 1, int(bin_hi) - 1)
emin = float(elo[i0])
emax = float(ehi[i1])
return EnergyBand(emin=emin, emin_unit="keV", emax=emax, emax_unit="keV")
[docs]
def channel_mask_from_ebounds(
ebounds: tuple[np.ndarray, np.ndarray, np.ndarray],
band: EnergyBand,
ch_band: Optional[ChannelBand] = None,
) -> np.ndarray:
ch, e_lo, e_hi = ebounds
mask = (e_hi > float(band.emin)) & (e_lo < float(band.emax))
if ch_band is not None:
mask &= (ch >= int(ch_band.ch_lo)) & (ch <= int(ch_band.ch_hi))
return mask
def _combine_mjdref(header: Dict[str, Any]) -> Optional[float]:
if header is None:
return None
if ("MJDREFI" in header) or ("MJDREFF" in header):
mjdi = float(header.get("MJDREFI", 0.0))
mjdf = float(header.get("MJDREFF", 0.0))
return mjdi + mjdf
if "MJDREF" in header:
try:
return float(header["MJDREF"])
except Exception:
return None
return None
def _first_non_empty(keys: list[str], *headers: Dict[str, Any]) -> Optional[Any]:
for key in keys:
for hdr in headers:
if hdr is None:
continue
if key in hdr and hdr[key] not in (None, "", " "):
return hdr[key]
return None
def _collect_headers_dump(hdul: fits.HDUList) -> FitsHeaderDump:
hdr0 = cast(Any, getattr(hdul[0], 'header', {})) if len(hdul) > 0 else {}
primary = dict(hdr0) if hdr0 else {}
exts: list[HduHeader] = []
for hdu in hdul[1:]:
hdu_any = cast(Any, hdu)
name = str(getattr(hdu, 'name', '') or '')
ver_val = cast(Any, hdu_any.header).get('EXTVER', None)
try:
ver = int(ver_val) if ver_val is not None else None
except Exception:
ver = None
exts.append(HduHeader(name=name, ver=ver, header=dict(cast(Any, hdu_any.header))))
return FitsHeaderDump(primary=primary, extensions=exts)
def _build_meta(hdul: fits.HDUList, prefer_header: Optional[Dict[str, Any]]) -> OgipMeta:
dump = _collect_headers_dump(hdul)
primary = dump.primary
other_ext_headers = [x.header for x in dump.extensions]
telescop = _first_non_empty(["TELESCOP"], *(hdr for hdr in [prefer_header, primary] + other_ext_headers if hdr))
instrume = _first_non_empty(["INSTRUME"], *(hdr for hdr in [prefer_header, primary] + other_ext_headers if hdr))
detnam = _first_non_empty(["DETNAM", "DETNAME"], *(hdr for hdr in [prefer_header, primary] + other_ext_headers if hdr))
timesys = _first_non_empty(["TIMESYS"], *(hdr for hdr in [prefer_header, primary] + other_ext_headers if hdr))
timeunit = _first_non_empty(["TIMEUNIT"], *(hdr for hdr in [prefer_header, primary] + other_ext_headers if hdr))
mjdref = None
for hdr in [prefer_header, primary] + other_ext_headers:
if hdr is None:
continue
mjdref = _combine_mjdref(hdr)
if mjdref is not None:
break
tstart = None
tstop = None
for hdr in [prefer_header, primary] + other_ext_headers:
if hdr is None:
continue
if (tstart is None) and ("TSTART" in hdr):
try:
tstart = float(hdr["TSTART"])
except Exception:
pass
if (tstop is None) and ("TSTOP" in hdr):
try:
tstop = float(hdr["TSTOP"])
except Exception:
pass
if (tstart is not None) and (tstop is not None):
break
obj = _first_non_empty(["OBJECT"], *(hdr for hdr in [prefer_header, primary] + other_ext_headers if hdr))
obs_id = _first_non_empty(["OBS_ID", "OBS_ID"], *(hdr for hdr in [prefer_header, primary] + other_ext_headers if hdr))
binsize_val = _first_non_empty(
["BIN_SIZE", "BINSIZE", "TIMEDEL", "DELTAT", "TBIN", "TIMEBIN"],
*(hdr for hdr in [prefer_header, primary] + other_ext_headers if hdr),
)
try:
binsize = float(binsize_val) if binsize_val is not None else None
except Exception:
binsize = None
timezero_val = _first_non_empty(["TIMEZERO"], *(hdr for hdr in [prefer_header, primary] + other_ext_headers if hdr))
try:
timezero = float(timezero_val) if timezero_val is not None else None
except Exception:
timezero = None
trefpos_val = _first_non_empty(["TREFPOS", "TREFDIR"], *(hdr for hdr in [prefer_header, primary] + other_ext_headers if hdr))
try:
trefpos = str(trefpos_val) if trefpos_val is not None else None
except Exception:
trefpos = None
dateobs_val = _first_non_empty(["DATE-OBS", "DATE_OBS"], *(hdr for hdr in [prefer_header, primary] + other_ext_headers if hdr))
try:
dateobs = str(dateobs_val) if dateobs_val is not None else None
except Exception:
dateobs = None
return OgipMeta(
telescop=str(telescop) if telescop is not None else None,
instrume=str(instrume) if instrume is not None else None,
detnam=str(detnam) if detnam is not None else None,
timesys=str(timesys) if timesys is not None else None,
timeunit=str(timeunit) if timeunit is not None else None,
mjdref=float(mjdref) if mjdref is not None else None,
tstart=float(tstart) if tstart is not None else None,
tstop=float(tstop) if tstop is not None else None,
object=str(obj) if obj is not None else None,
obs_id=str(obs_id) if obs_id is not None else None,
binsize=binsize,
timezero=timezero,
trefpos=trefpos,
dateobs=dateobs,
)
def _mission_timezero_object(telescop: Optional[str], timezero: float, *, allow_unix_fallback: bool = False):
if telescop is None:
return None
try:
from .time import Time
format_map = {
'FERMI': 'fermi',
'EP': 'ep',
'LEIA': 'leia',
'GECAM': 'gecam',
'HXMT': 'hxmt',
'SWIFT': 'swift',
'GRID': 'grid',
'MAXI': 'maxi',
'SUZAKU': 'suzaku',
'XMM': 'newton',
'NEWTON': 'newton',
'XRISM': 'xrism',
}
met_format = None
for key, fmt in format_map.items():
if key in telescop:
met_format = fmt
break
if met_format is not None:
return Time(timezero, format=met_format)
if allow_unix_fallback:
try:
return Time(timezero, format='unix', scale='utc')
except Exception:
return None
raise RuntimeError("未知望远镜类型,请将header中的相关关键字发送给作者以方便添加" + telescop)
except ImportError:
return None
def _extract_gti(hdul: fits.HDUList) -> Optional[list[tuple[float, float]]]:
if hdul is None:
return None
for hdu in hdul:
hdr = getattr(hdu, 'header', {})
name = (hdr.get('EXTNAME') or '').upper() if hdr else ''
if name == 'GTI' or getattr(hdu, 'name', '').upper() == 'GTI':
data = getattr(hdu, 'data', None)
if data is None:
continue
cols = getattr(data, 'columns', None)
colnames = [n.upper() for n in (cols.names if cols is not None else [])]
start_col = None
stop_col = None
for n in ['START', 'TSTART']:
if n in colnames:
start_col = n
break
for n in ['STOP', 'TSTOP']:
if n in colnames:
stop_col = n
break
if start_col and stop_col:
arr_start = data[start_col]
arr_stop = data[stop_col]
try:
return [(float(s), float(e)) for s, e in zip(arr_start, arr_stop)]
except (TypeError, ValueError):
return None
return None
def _load_regions(hdul: fits.HDUList) -> Optional[RegionArea]:
reg_hdu = None
for ext in hdul:
name = (getattr(ext, 'name', '') or '').upper()
if name == 'REG00101':
reg_hdu = ext
break
if reg_hdu is None or getattr(reg_hdu, 'data', None) is None:
return None
reg_any = cast(Any, reg_hdu)
if getattr(reg_any, 'data', None) is None:
return None
data = reg_any.data
cols = getattr(data, 'columns', None)
colnames = [str(n).upper() for n in (cols.names if cols is not None else [])]
def _get_col(name_variants: list[str]) -> Optional[str]:
for nn in name_variants:
if nn in colnames:
return nn
return None
shape_col = _get_col(['SHAPE'])
component_col = _get_col(['COMPONENT'])
r_col = _get_col(['R', 'RADIUS', 'R0'])
rin_col = _get_col(['R_IN', 'RIN', 'R1'])
rout_col = _get_col(['R_OUT', 'ROUT', 'R2'])
nrows = len(data)
rows_info: list[Dict[str, Any]] = []
def _as_float(v: Any) -> Optional[float]:
if v is None:
return None
try:
arr = np.asarray(v)
if arr.size == 0:
return None
return float(arr.reshape(-1)[0])
except Exception:
return None
for i in range(nrows):
shape_val = ''
if shape_col and shape_col in colnames:
try:
shape_val = str(data[shape_col][i]).upper().strip()
except Exception:
shape_val = ''
comp_val = None
if component_col and component_col in colnames:
tmp = _as_float(data[component_col][i])
comp_val = int(tmp) if tmp is not None else None
area = None
if (shape_val == 'CIRCLE') or (shape_col is None and r_col is not None and (rin_col is None or rout_col is None)):
rv = data[r_col][i] if r_col else None
r = _as_float(rv) if rv is not None else None
if r is not None and r > 0:
area = np.pi * (r ** 2)
elif shape_val == 'ANNULUS' or (rin_col is not None and rout_col is not None):
rin = _as_float(data[rin_col][i]) if rin_col else None
rout = _as_float(data[rout_col][i]) if rout_col else None
if rin is None and rout is None and r_col:
rv = data[r_col][i]
try:
rv_arr = np.asarray(rv).reshape(-1)
rin = _as_float(rv_arr[0]) if rv_arr.size >= 1 else None
rout = _as_float(rv_arr[1]) if rv_arr.size >= 2 else None
except Exception:
rin = None
rout = None
if (rin is not None) and (rout is not None) and rout > rin:
area = np.pi * (rout ** 2 - rin ** 2)
rows_info.append({'shape': shape_val, 'area': area, 'component': comp_val, 'role': 'unknown'})
if not rows_info:
return None
annuli = [r for r in rows_info if r['shape'] == 'ANNULUS' and r['area']]
circles = [r for r in rows_info if r['shape'] == 'CIRCLE' and r['area']]
if component_col and any(r['component'] is not None for r in rows_info):
for r in rows_info:
comp = r['component']
if comp == 1:
r['role'] = 'source'
elif comp is not None:
r['role'] = 'background'
if not component_col:
if annuli:
for r in annuli:
if r['role'] == 'unknown':
r['role'] = 'background'
if circles:
for r in circles:
if r['role'] == 'unknown':
r['role'] = 'source'
elif len(circles) >= 1:
sorted_c = sorted(circles, key=lambda x: x['area'] or 0)
if sorted_c:
if sorted_c[0]['role'] == 'unknown':
sorted_c[0]['role'] = 'source'
for r in sorted_c[1:]:
if r['role'] == 'unknown':
r['role'] = 'background'
if len(rows_info) == 1 and rows_info[0]['role'] == 'unknown':
rows_info[0]['role'] = 'source'
def _normalize_role(role: str) -> Literal['src', 'bkg', 'unk']:
if role == 'source':
return 'src'
if role == 'background':
return 'bkg'
return 'unk'
regions = [
RegionArea(role=_normalize_role(cast(Any, r['role'])), shape=r['shape'] or None, area=r['area'], component=r['component'])
for r in rows_info
]
if not regions:
return None
src_region = next((r for r in regions if r.role == 'src'), None)
if src_region is not None:
return src_region
bkg_region = next((r for r in regions if r.role == 'bkg'), None)
if bkg_region is not None:
return bkg_region
return regions[0]
[docs]
class OgipArfReader:
def __init__(self, path: str | Path):
self.path = Path(path)
if not self.path.exists():
raise FileNotFoundError(str(self.path))
self._data: Optional[ArfData] = None
[docs]
def read(self) -> ArfData:
with fits.open(self.path) as h:
hd = cast(Any, h["SPECRESP"])
d = hd.data
columns = tuple(getattr(d.columns, 'names', ()) or ())
energ_lo = np.asarray(d["ENERG_LO"], float)
energ_hi = np.asarray(d["ENERG_HI"], float)
specresp = np.asarray(d["SPECRESP"], float)
header = dict(cast(Any, hd.header))
headers_dump = _collect_headers_dump(h)
meta = _build_meta(h, header)
self._data = ArfData(
path=self.path,
energ_lo=energ_lo,
energ_hi=energ_hi,
specresp=specresp,
columns=columns,
header=header,
meta=meta,
headers_dump=headers_dump,
)
return self._data
[docs]
def validate(self) -> ValidationReport:
if self._data is None:
self.read()
assert self._data is not None
return self._data.validate()
[docs]
class OgipRmfReader:
def __init__(self, path: str | Path):
self.path = Path(path)
if not self.path.exists():
raise FileNotFoundError(str(self.path))
self._data: Optional[RmfData] = None
[docs]
def read(self) -> RmfData:
with fits.open(self.path) as h:
hm = cast(Any, h["MATRIX"])
dm = hm.data
matrix_columns = tuple(getattr(dm.columns, 'names', ()) or ())
energ_lo = np.asarray(dm["ENERG_LO"], float)
energ_hi = np.asarray(dm["ENERG_HI"], float)
header = dict(cast(Any, hm.header))
headers_dump = _collect_headers_dump(h)
meta = _build_meta(h, header)
matrix = np.asarray(dm["MATRIX"], dtype=object)
n_grp = np.asarray(dm["N_GRP"]) if "N_GRP" in matrix_columns else None
f_chan = np.asarray(dm["F_CHAN"]) if "F_CHAN" in matrix_columns else None
n_chan = np.asarray(dm["N_CHAN"]) if "N_CHAN" in matrix_columns else None
channel = None
e_min = None
e_max = None
ebounds_columns: tuple[str, ...] = ()
if "EBOUNDS" in h:
de = cast(Any, h["EBOUNDS"]).data
ebounds_columns = tuple(getattr(de.columns, 'names', ()) or ())
channel = np.asarray(de["CHANNEL"], int)
e_min = np.asarray(de["E_MIN"], float)
e_max = np.asarray(de["E_MAX"], float)
columns = matrix_columns + tuple(name for name in ebounds_columns if name not in matrix_columns)
self._data = RmfData(
path=self.path,
energ_lo=energ_lo,
energ_hi=energ_hi,
n_grp=n_grp,
f_chan=f_chan,
n_chan=n_chan,
matrix=matrix,
channel=channel,
e_min=e_min,
e_max=e_max,
columns=columns,
header=header,
meta=meta,
headers_dump=headers_dump,
)
return self._data
[docs]
def validate(self) -> ValidationReport:
if self._data is None:
self.read()
assert self._data is not None
return self._data.validate()
[docs]
class OgipPhaReader:
def __init__(self, path: str | Path):
self.path = Path(path)
if not self.path.exists():
raise FileNotFoundError(str(self.path))
self._data: Optional[PhaData] = None
[docs]
def read(self) -> PhaData:
with fits.open(self.path) as h:
hs = cast(Any, h["SPECTRUM"])
ds = hs.data
spectrum_columns = tuple(getattr(ds.columns, 'names', ()) or ())
channels = np.asarray(ds["CHANNEL"], int)
rate = np.asarray(ds["RATE"], float) if "RATE" in spectrum_columns else None
counts = np.asarray(ds["COUNTS"], float) if "COUNTS" in spectrum_columns else None
stat_err = np.asarray(ds["STAT_ERR"], float) if "STAT_ERR" in spectrum_columns else None
header_map = cast(Any, hs.header)
exposure = float(header_map.get("EXPOSURE", header_map.get("EXPTIME", np.nan)))
if counts is None:
if rate is None:
raise ValueError("PHA SPECTRUM lacks both COUNTS and RATE columns")
if np.isfinite(exposure) and exposure > 0:
counts = np.asarray(rate, float) * float(exposure)
else:
counts = np.asarray(rate, float)
backscal = float(header_map.get("BACKSCAL")) if "BACKSCAL" in header_map else None
areascal = float(header_map.get("AREASCAL")) if "AREASCAL" in header_map else None
quality = np.asarray(ds["QUALITY"], int) if "QUALITY" in spectrum_columns else None
grouping = np.asarray(ds["GROUPING"], int) if "GROUPING" in spectrum_columns else None
raw_spectrum_columns: Dict[str, np.ndarray] = {}
for cn in spectrum_columns:
try:
raw_spectrum_columns[cn] = np.asarray(ds[cn])
except Exception:
continue
ebounds = None
ebounds_columns: tuple[str, ...] = ()
if "EBOUNDS" in h:
de = cast(Any, h["EBOUNDS"]).data
ebounds_columns = tuple(getattr(de.columns, 'names', ()) or ())
ebounds = (
np.asarray(de["CHANNEL"], int),
np.asarray(de["E_MIN"], float),
np.asarray(de["E_MAX"], float),
)
header = dict(header_map)
headers_dump = _collect_headers_dump(h)
meta = _build_meta(h, header)
respfile = None
ancrfile = None
try:
if 'RESPFILE' in header and header.get('RESPFILE') not in (None, '', ' '):
respfile = str(header.get('RESPFILE'))
except Exception:
respfile = None
try:
if 'ANCRFILE' in header and header.get('ANCRFILE') not in (None, '', ' '):
ancrfile = str(header.get('ANCRFILE'))
except Exception:
ancrfile = None
columns = spectrum_columns + tuple(name for name in ebounds_columns if name not in spectrum_columns)
self._data = PhaData(
path=self.path,
channels=channels,
counts=counts,
rate=rate,
stat_err=stat_err,
exposure=exposure,
backscal=backscal,
areascal=areascal,
respfile=respfile,
ancrfile=ancrfile,
quality=quality,
grouping=grouping,
ebounds=ebounds,
raw_spectrum_columns=raw_spectrum_columns,
columns=columns,
header=header,
meta=meta,
headers_dump=headers_dump,
)
return self._data
[docs]
def validate(self) -> ValidationReport:
if self._data is None:
self.read()
assert self._data is not None
return self._data.validate()
[docs]
def select_by_band(
self,
band: EnergyBand,
rmf_chan_band: Optional[ChannelBand] = ChannelBand(51, 399),
) -> tuple[np.ndarray, np.ndarray]:
if self._data is None:
_ = self.read()
d = self._data
assert d is not None
if d.ebounds is not None:
mask = channel_mask_from_ebounds(d.ebounds, band, rmf_chan_band)
ch_all = d.ebounds[0]
idx_map = {int(c): i for i, c in enumerate(d.channels)}
sel_channels = ch_all[mask]
sel_idx = np.array([idx_map[c] for c in sel_channels if c in idx_map], dtype=int)
return d.channels[sel_idx], d.counts[sel_idx]
if rmf_chan_band is None:
return d.channels, d.counts
mask = (d.channels >= rmf_chan_band.ch_lo) & (d.channels <= rmf_chan_band.ch_hi)
return d.channels[mask], d.counts[mask]
[docs]
class OgipLightcurveReader:
def __init__(self, path: str | Path):
self.path = Path(path)
if not self.path.exists():
raise FileNotFoundError(str(self.path))
self._data: Optional[LightcurveData] = None
[docs]
def read(self) -> LightcurveData:
with fits.open(self.path) as h:
hdu = None
for ext in h:
ext_any = cast(Any, ext)
if getattr(ext_any, 'data', None) is None:
continue
names = getattr(ext_any.data, 'columns', None)
if names is not None and ('TIME' in [n.upper() for n in names.names]):
hdu = ext_any
break
if hdu is None:
raise ValueError('No suitable lightcurve HDU with TIME column found')
d = cast(Any, hdu.data)
col_names_upper = [n.upper() for n in d.columns.names]
header = dict(cast(Any, hdu.header))
time_raw = np.asarray(d['TIME'], dtype=float)
timezero_raw = 0.0
if 'TIMEZERO' in header:
try:
timezero_raw = float(header['TIMEZERO'])
except (ValueError, TypeError):
timezero_raw = 0.0
time_offset = float(time_raw[0]) if len(time_raw) > 0 else 0.0
time = time_raw - time_offset
time_rel = time
timezero = timezero_raw + time_offset
telescop = None
primary_header = dict(cast(Any, h[0]).header) if len(h) > 0 else {}
for hdr in [header, primary_header]:
if 'TELESCOP' in hdr:
telescop = str(hdr['TELESCOP']).strip().upper()
break
timezero_obj = _mission_timezero_object(telescop, timezero, allow_unix_fallback=False)
dt = None
if 'TIMEDEL' in header:
try:
dt = float(header['TIMEDEL'])
except (ValueError, TypeError):
pass
if dt is None and time.size >= 2:
dt = float(np.median(np.diff(time)))
if dt is None:
raise ValueError('binsize未被正确加载')
bin_lo = time.copy()
bin_hi = time + dt
rate = None
counts = None
is_rate = False
value = None
if 'RATE' in col_names_upper:
rate = np.asarray(d['RATE'], dtype=float)
value = rate
is_rate = True
if 'COUNTS' not in col_names_upper and dt > 0:
counts = rate * dt
if 'COUNTS' in col_names_upper:
counts = np.asarray(d['COUNTS'], dtype=float)
if value is None:
value = counts
is_rate = False
if rate is None and dt > 0:
rate = counts / dt
if value is None:
raise ValueError('Lightcurve HDU lacks RATE/COUNTS column')
error = None
rate_err = None
counts_err = None
if 'ERROR' in col_names_upper:
error = np.asarray(d['ERROR'], dtype=float)
if is_rate:
rate_err = error
if counts is not None and dt > 0:
counts_err = error * dt
else:
counts_err = error
if rate is not None and dt > 0:
rate_err = error / dt
fracexp = np.asarray(d['FRACEXP'], dtype=float) if 'FRACEXP' in col_names_upper else None
quality = np.asarray(d['QUALITY'], dtype=int) if 'QUALITY' in col_names_upper else None
backscal_col = d['BACKSCAL'] if 'BACKSCAL' in col_names_upper else (d['BACK_SCAL'] if 'BACK_SCAL' in col_names_upper else None)
areascal_col = d['AREASCAL'] if 'AREASCAL' in col_names_upper else (d['AREA_SCAL'] if 'AREA_SCAL' in col_names_upper else None)
gti_start = None
gti_stop = None
try:
gti_list = _extract_gti(h)
if gti_list is not None:
gti_start = np.array([s for s, _ in gti_list], dtype=float)
gti_stop = np.array([e for _, e in gti_list], dtype=float)
except Exception:
pass
tstart = timezero
if len(time) > 0:
tstop = timezero + time[-1] + dt
tseg = float(tstop - tstart)
else:
tseg = None
headers_dump = _collect_headers_dump(h)
meta = _build_meta(h, header)
if timezero_obj is None:
raise ValueError('无法构建 timezero_obj,请检查 TELESCOP/MJDREF/时间字段')
exposure = None
if 'EXPOSURE' in header:
try:
exposure = float(header['EXPOSURE'])
except (ValueError, TypeError):
pass
if exposure is None and 'EXPTIME' in header:
try:
exposure = float(header['EXPTIME'])
except (ValueError, TypeError):
pass
bin_exposure = None
bin_width = None
if (bin_lo is not None) and (bin_hi is not None) and (len(bin_lo) == len(bin_hi)):
try:
bin_width = np.asarray(bin_hi, dtype=float) - np.asarray(bin_lo, dtype=float)
except Exception:
bin_width = None
if bin_width is None and len(time) > 0:
dt_arr = np.asarray(dt, dtype=float) if dt is not None else np.asarray([], dtype=float)
if dt_arr.ndim == 0 and dt_arr.size != 0 and np.isfinite(float(dt_arr)) and float(dt_arr) > 0:
bin_width = np.full(len(time), float(dt_arr), dtype=float)
elif dt_arr.ndim == 1 and dt_arr.size == len(time):
bin_width = dt_arr
if fracexp is not None and len(time) > 0:
fracexp_arr = np.asarray(fracexp, dtype=float)
if fracexp_arr.shape == (len(time),):
fracexp_arr = np.where(np.isfinite(fracexp_arr), fracexp_arr, 1.0)
fracexp_arr = np.clip(fracexp_arr, 0.0, 1.0)
if bin_width is not None:
bin_exposure = fracexp_arr * np.asarray(bin_width, dtype=float)
if bin_exposure is None and bin_width is not None:
bin_exposure = np.asarray(bin_width, dtype=float)
if bin_exposure is None and exposure is not None and len(time) > 0:
bin_exposure = np.full(len(time), exposure / len(time), dtype=float)
err_dist = 'poisson' if counts is not None else ('gauss' if rate is not None else None)
try:
region = _load_regions(h)
except Exception:
region = None
timesys = str(header['TIMESYS']) if 'TIMESYS' in header else (meta.timesys if meta and meta.timesys else None)
mjdref = meta.mjdref if meta and meta.mjdref else None
self._data = LightcurveData(
path=self.path,
time=time,
time_raw=time_raw,
time_rel=time_rel,
timezero=timezero,
timezero_obj=timezero_obj,
dt=dt,
bin_lo=bin_lo,
bin_hi=bin_hi,
tstart=tstart,
tseg=tseg,
value=value,
error=error,
is_rate=is_rate,
counts=counts,
rate=rate,
counts_err=counts_err,
rate_err=rate_err,
err_dist=err_dist,
gti_start=gti_start,
gti_stop=gti_stop,
quality=quality,
fracexp=fracexp,
exposure=exposure,
bin_exposure=bin_exposure,
backscal=backscal_col,
areascal=areascal_col,
telescop=telescop,
timesys=timesys,
mjdref=mjdref,
region=region,
header=header,
meta=meta,
headers_dump=headers_dump,
columns=tuple(d.columns.names),
)
return self._data
[docs]
def validate(self) -> ValidationReport:
if self._data is None:
self.read()
assert self._data is not None
return self._data.validate()
[docs]
class OgipEventReader:
def __init__(self, path: str | Path):
self.path = Path(path)
if not self.path.exists():
raise FileNotFoundError(str(self.path))
self._data: Optional[EventData] = None
[docs]
def read(self) -> EventData:
with fits.open(self.path) as h:
hevt = None
for ext in h:
ext_any = cast(Any, ext)
if getattr(ext_any, 'data', None) is None:
continue
names = getattr(ext_any.data, 'columns', None)
if names is not None and ('TIME' in names.names):
hevt = ext_any
break
if hevt is None:
raise ValueError('No EVENTS-like HDU with TIME column found')
de = cast(Any, hevt.data)
colnames = list(getattr(de, 'columns').names) if getattr(de, 'columns', None) is not None else []
raw_columns: Dict[str, np.ndarray] = {}
for cn in colnames:
try:
raw_columns[cn] = np.asarray(de[cn])
except Exception:
try:
raw_columns[cn] = np.asarray([r[cn] for r in de])
except Exception:
raw_columns[cn] = np.asarray([])
time_raw = np.asarray(raw_columns.get('TIME') if 'TIME' in raw_columns else de['TIME'], float)
header = dict(cast(Any, hevt.header))
timezero_raw = 0.0
if 'TIMEZERO' in header:
try:
timezero_raw = float(header['TIMEZERO'])
except (ValueError, TypeError):
timezero_raw = 0.0
time_offset = float(time_raw[0]) if len(time_raw) > 0 else 0.0
time = time_raw - time_offset
time_rel = time
timezero = timezero_raw + time_offset
telescop = None
primary_header = dict(cast(Any, h[0]).header) if len(h) > 0 else {}
for hdr in [header, primary_header]:
if 'TELESCOP' in hdr:
telescop = str(hdr['TELESCOP']).strip().upper()
break
timezero_obj = _mission_timezero_object(telescop, timezero, allow_unix_fallback=True) if telescop is not None and timezero != 0.0 else None
pi = np.asarray(raw_columns['PI'], int) if 'PI' in raw_columns else None
channel = np.asarray(raw_columns['CHANNEL'], int) if 'CHANNEL' in raw_columns else None
headers_dump = _collect_headers_dump(h)
meta = _build_meta(h, header)
ebounds = None
try:
if 'EBOUNDS' in h:
de_eb = cast(Any, h['EBOUNDS']).data
ebounds = (
np.asarray(de_eb['CHANNEL'], int),
np.asarray(de_eb['E_MIN'], float),
np.asarray(de_eb['E_MAX'], float),
)
except Exception:
ebounds = None
gti_start = None
gti_stop = None
gti_start_obj = None
gti_stop_obj = None
gti_list = None
try:
for hdu in h:
if getattr(hdu, 'name', '').upper() == 'GTI':
gti_data = getattr(hdu, 'data', None)
if gti_data is not None and 'START' in gti_data.columns.names and 'STOP' in gti_data.columns.names:
gti_start_raw = np.asarray(gti_data['START'], float)
gti_stop_raw = np.asarray(gti_data['STOP'], float)
gti_start = gti_start_raw - time_offset
gti_stop = gti_stop_raw - time_offset
gti_list = [(float(s), float(e)) for s, e in zip(gti_start, gti_stop)]
if timezero_obj is not None:
try:
from astropy.time import TimeDelta
gti_start_obj = timezero_obj + TimeDelta(gti_start, format='sec')
gti_stop_obj = timezero_obj + TimeDelta(gti_stop, format='sec')
except Exception:
gti_start_obj = gti_stop_obj = None
break
except Exception:
gti_start = gti_stop = None
gti_start_obj = gti_stop_obj = None
gti_list = None
u2orig = {cn.upper(): cn for cn in colnames}
def _find(*cands: str) -> Optional[str]:
for c in cands:
if c is None:
continue
uc = c.upper()
if uc in u2orig:
return u2orig[uc]
return None
colmap: Dict[str, Optional[str]] = {}
colmap['x'] = _find('X', 'XRAW', 'RAWX', 'DETX', 'DET_X', 'SKX', 'XDET')
colmap['y'] = _find('Y', 'YRAW', 'RAWY', 'DETY', 'DET_Y', 'SKY', 'YDET')
colmap['ra'] = _find('RA', 'RA_OBJ', 'RAX', 'RA_DEG')
colmap['dec'] = _find('DEC', 'DEC_OBJ', 'DECX', 'DEC_DEG')
colmap['energy'] = _find('ENERGY', 'E', 'ENERG', 'PHOTON_ENERGY')
colmap['pha'] = _find('PHA', 'PI')
key_x = colmap.get('x')
key_y = colmap.get('y')
key_energy = colmap.get('energy')
xarr = np.asarray(raw_columns[key_x]) if (key_x is not None and key_x in raw_columns) else None
yarr = np.asarray(raw_columns[key_y]) if (key_y is not None and key_y in raw_columns) else None
energy = np.asarray(raw_columns[key_energy]) if (key_energy is not None and key_energy in raw_columns) else None
self._data = EventData(
path=self.path,
time=time,
time_raw=time_raw,
time_rel=time_rel,
timezero=timezero,
timezero_obj=timezero_obj,
telescop=telescop,
pi=pi,
channel=channel,
x=xarr,
y=yarr,
gti_start=gti_start,
gti_stop=gti_stop,
gti_start_obj=gti_start_obj,
gti_stop_obj=gti_stop_obj,
gti=gti_list,
raw_columns=raw_columns,
colmap=colmap,
energy=energy,
ebounds=ebounds,
header=header,
meta=meta,
headers_dump=headers_dump,
columns=tuple(colnames),
)
return self._data
[docs]
def validate(self) -> ValidationReport:
if self._data is None:
self.read()
assert self._data is not None
return self._data.validate()
[docs]
class ArfReader(OgipArfReader):
pass
[docs]
class RmfReader(OgipRmfReader):
pass
[docs]
class RspReader(OgipRmfReader):
pass
[docs]
class LightcurveReader(OgipLightcurveReader):
pass
OgipData = Union[ArfData, RmfData, PhaData, LightcurveData, EventData]
OgipWritableData = Union[ArfData, RmfData, PhaData, LightcurveData, EventData]
def _normalize_grouping_to_flags(grouping: np.ndarray) -> np.ndarray:
g = np.asarray(grouping, dtype=int)
if g.size == 0:
return g
nz = g[g != 0]
if nz.size == 0:
return g
if np.all(np.isin(nz, [-1, 1])):
return g
out = np.zeros_like(g)
prev_gid = None
for i, gid in enumerate(g):
if gid <= 0:
continue
if prev_gid != int(gid):
out[i] = 1
prev_gid = int(gid)
else:
out[i] = -1
return out
[docs]
class PhaWriter:
def __init__(self, data: PhaData, outpath: str | Path):
self.data = data
self.outpath = Path(outpath)
[docs]
def write(self, *, overwrite: bool = False) -> Path:
outp = self.outpath
if outp.exists() and not overwrite:
raise FileExistsError(str(outp))
pha = self.data
cols: list[fits.Column] = []
channels = np.asarray(pha.channels, dtype=int)
counts = np.asarray(pha.counts, dtype=float)
cols.append(fits.Column(name='CHANNEL', format='J', array=channels))
cols.append(fits.Column(name='COUNTS', format='E', array=counts))
if getattr(pha, 'rate', None) is not None:
cols.append(fits.Column(name='RATE', format='E', array=np.asarray(pha.rate, dtype=float)))
if pha.stat_err is not None:
cols.append(fits.Column(name='STAT_ERR', format='E', array=np.asarray(pha.stat_err, dtype=float)))
if pha.quality is not None:
cols.append(fits.Column(name='QUALITY', format='J', array=np.asarray(pha.quality, dtype=int)))
if pha.grouping is not None:
g = _normalize_grouping_to_flags(np.asarray(pha.grouping, dtype=int))
cols.append(fits.Column(name='GROUPING', format='J', array=g))
hdu_spec = fits.BinTableHDU.from_columns(cols, name='SPECTRUM')
hdr = hdu_spec.header
hdr['EXTNAME'] = 'SPECTRUM'
hdr['HDUCLASS'] = 'OGIP'
hdr['HDUCLAS1'] = 'SPECTRUM'
hdr['CHANTYPE'] = str(getattr(pha.header, 'get', lambda *_: None)('CHANTYPE') or 'PI') if pha.header is not None else 'PI'
if pha.meta is not None and getattr(pha.meta, 'telescop', None) is not None:
hdr['TELESCOP'] = str(pha.meta.telescop)
elif pha.header is not None and 'TELESCOP' in pha.header:
hdr['TELESCOP'] = str(pha.header['TELESCOP'])
if pha.meta is not None and getattr(pha.meta, 'instrume', None) is not None:
hdr['INSTRUME'] = str(pha.meta.instrume)
elif pha.header is not None and 'INSTRUME' in pha.header:
hdr['INSTRUME'] = str(pha.header['INSTRUME'])
if pha.exposure is not None:
hdr['EXPOSURE'] = float(pha.exposure)
if pha.backscal is not None:
hdr['BACKSCAL'] = float(pha.backscal)
if pha.areascal is not None:
hdr['AREASCAL'] = float(pha.areascal)
if getattr(pha, 'respfile', None) is not None:
hdr['RESPFILE'] = str(pha.respfile)
if getattr(pha, 'ancrfile', None) is not None:
hdr['ANCRFILE'] = str(pha.ancrfile)
if pha.header is not None:
for k, v in dict(pha.header).items():
key = str(k).upper()
if key in hdr:
continue
if key in {'SIMPLE', 'BITPIX', 'NAXIS', 'EXTEND'}:
continue
try:
hdr[key] = v
except Exception:
continue
prih = fits.PrimaryHDU()
if pha.header is not None:
for key in ('TELESCOP', 'INSTRUME', 'OBS_ID', 'OBJECT'):
if key in pha.header:
try:
prih.header[key] = pha.header[key]
except Exception:
pass
if pha.meta is not None:
if getattr(pha.meta, 'tstart', None) is not None:
prih.header['TSTART'] = float(pha.meta.tstart)
if getattr(pha.meta, 'tstop', None) is not None:
prih.header['TSTOP'] = float(pha.meta.tstop)
hdul = fits.HDUList([prih, hdu_spec])
if getattr(pha, 'ebounds', None) is not None:
ebounds = pha.ebounds
if ebounds is not None and len(ebounds) == 3:
ch_eb, emin, emax = ebounds
cols_eb = [
fits.Column(name='CHANNEL', format='J', array=np.asarray(ch_eb, dtype=int)),
fits.Column(name='E_MIN', format='E', array=np.asarray(emin, dtype=float)),
fits.Column(name='E_MAX', format='E', array=np.asarray(emax, dtype=float)),
]
hdul.append(fits.BinTableHDU.from_columns(cols_eb, name='EBOUNDS'))
hdul.writeto(outp, overwrite=overwrite)
return outp
[docs]
class ArfWriter:
def __init__(self, data: ArfData, outpath: str | Path):
self.data = data
self.outpath = Path(outpath)
[docs]
def write(self, *, overwrite: bool = False) -> Path:
outp = self.outpath
if outp.exists() and not overwrite:
raise FileExistsError(str(outp))
arf = self.data
cols = [
fits.Column(name='ENERG_LO', format='E', array=np.asarray(arf.energ_lo, dtype=float)),
fits.Column(name='ENERG_HI', format='E', array=np.asarray(arf.energ_hi, dtype=float)),
fits.Column(name='SPECRESP', format='E', array=np.asarray(arf.specresp, dtype=float)),
]
hdu = fits.BinTableHDU.from_columns(cols, name='SPECRESP')
hdu.header['EXTNAME'] = 'SPECRESP'
hdu.header['HDUCLASS'] = 'OGIP'
hdu.header['HDUCLAS1'] = 'RESPONSE'
hdu.header['HDUCLAS2'] = 'SPECRESP'
if arf.header is not None:
for key in ('TELESCOP', 'INSTRUME', 'DETNAM', 'FILTER'):
if key in arf.header:
try:
hdu.header[key] = arf.header[key]
except Exception:
pass
prih = fits.PrimaryHDU()
hdul = fits.HDUList([prih, hdu])
hdul.writeto(outp, overwrite=overwrite)
return outp
[docs]
class RmfWriter:
def __init__(self, data: RmfData, outpath: str | Path):
self.data = data
self.outpath = Path(outpath)
[docs]
def write(self, *, overwrite: bool = False) -> Path:
outp = self.outpath
if outp.exists() and not overwrite:
raise FileExistsError(str(outp))
rmf = self.data
def _vla_array(values: np.ndarray | list[Any] | tuple[Any, ...] | None, dtype: Any) -> np.ndarray:
if values is None:
return np.asarray([], dtype=object)
seq = np.asarray(values, dtype=object)
out: list[np.ndarray] = []
for item in seq:
if item is None:
out.append(np.asarray([], dtype=dtype))
else:
out.append(np.atleast_1d(np.asarray(item, dtype=dtype)))
return np.asarray(out, dtype=object)
cols: list[fits.Column] = [
fits.Column(name='ENERG_LO', format='E', array=np.asarray(rmf.energ_lo, dtype=float)),
fits.Column(name='ENERG_HI', format='E', array=np.asarray(rmf.energ_hi, dtype=float)),
]
if rmf.n_grp is not None:
cols.append(fits.Column(name='N_GRP', format='J', array=np.asarray(rmf.n_grp, dtype=int)))
if rmf.f_chan is not None:
cols.append(fits.Column(name='F_CHAN', format='PJ()', array=_vla_array(rmf.f_chan, int)))
if rmf.n_chan is not None:
cols.append(fits.Column(name='N_CHAN', format='PJ()', array=_vla_array(rmf.n_chan, int)))
cols.append(fits.Column(name='MATRIX', format='PE()', array=_vla_array(rmf.matrix, float)))
hdu_mat = fits.BinTableHDU.from_columns(cols, name='MATRIX')
hdu_mat.header['EXTNAME'] = 'MATRIX'
hdu_mat.header['HDUCLASS'] = 'OGIP'
hdu_mat.header['HDUCLAS1'] = 'RESPONSE'
hdu_mat.header['HDUCLAS2'] = 'RSP_MATRIX'
prih = fits.PrimaryHDU()
hdul = fits.HDUList([prih, hdu_mat])
if rmf.channel is not None and rmf.e_min is not None and rmf.e_max is not None:
cols_eb = [
fits.Column(name='CHANNEL', format='J', array=np.asarray(rmf.channel, dtype=int)),
fits.Column(name='E_MIN', format='E', array=np.asarray(rmf.e_min, dtype=float)),
fits.Column(name='E_MAX', format='E', array=np.asarray(rmf.e_max, dtype=float)),
]
hdul.append(fits.BinTableHDU.from_columns(cols_eb, name='EBOUNDS'))
hdul.writeto(outp, overwrite=overwrite)
return outp
[docs]
class LightcurveWriter:
def __init__(self, data: LightcurveData, outpath: str | Path):
self.data = data
self.outpath = Path(outpath)
[docs]
def write(self, *, overwrite: bool = False) -> Path:
outp = self.outpath
if outp.exists() and not overwrite:
raise FileExistsError(str(outp))
lc = self.data
t = np.asarray(lc.time if lc.time is not None else np.asarray([], dtype=float), dtype=float)
cols = [fits.Column(name='TIME', format='D', array=t)]
if lc.rate is not None:
cols.append(fits.Column(name='RATE', format='E', array=np.asarray(lc.rate, dtype=float)))
if lc.rate_err is not None:
cols.append(fits.Column(name='ERROR', format='E', array=np.asarray(lc.rate_err, dtype=float)))
elif lc.counts is not None:
cols.append(fits.Column(name='COUNTS', format='E', array=np.asarray(lc.counts, dtype=float)))
if lc.counts_err is not None:
cols.append(fits.Column(name='ERROR', format='E', array=np.asarray(lc.counts_err, dtype=float)))
if lc.fracexp is not None:
cols.append(fits.Column(name='FRACEXP', format='E', array=np.asarray(lc.fracexp, dtype=float)))
if lc.quality is not None:
cols.append(fits.Column(name='QUALITY', format='J', array=np.asarray(lc.quality, dtype=int)))
hdu = fits.BinTableHDU.from_columns(cols, name='LIGHTCURVE')
hdu.header['EXTNAME'] = 'LIGHTCURVE'
if lc.exposure is not None:
hdu.header['EXPOSURE'] = float(lc.exposure)
if lc.meta is not None and getattr(lc.meta, 'telescop', None) is not None:
hdu.header['TELESCOP'] = str(lc.meta.telescop)
if lc.meta is not None and getattr(lc.meta, 'instrume', None) is not None:
hdu.header['INSTRUME'] = str(lc.meta.instrume)
prih = fits.PrimaryHDU()
hdul = fits.HDUList([prih, hdu])
if lc.gti_start is not None and lc.gti_stop is not None:
cols_g = [
fits.Column(name='START', format='D', array=np.asarray(lc.gti_start, dtype=float)),
fits.Column(name='STOP', format='D', array=np.asarray(lc.gti_stop, dtype=float)),
]
hdul.append(fits.BinTableHDU.from_columns(cols_g, name='GTI'))
hdul.writeto(outp, overwrite=overwrite)
return outp
[docs]
class EventWriter:
def __init__(self, data: EventData, outpath: str | Path):
self.data = data
self.outpath = Path(outpath)
[docs]
def write(self, *, overwrite: bool = False) -> Path:
outp = self.outpath
if outp.exists() and not overwrite:
raise FileExistsError(str(outp))
ev = self.data
cols: list[fits.Column] = [fits.Column(name='TIME', format='D', array=np.asarray(ev.time, dtype=float))]
if ev.pi is not None:
cols.append(fits.Column(name='PI', format='J', array=np.asarray(ev.pi, dtype=int)))
if ev.channel is not None:
cols.append(fits.Column(name='CHANNEL', format='J', array=np.asarray(ev.channel, dtype=int)))
if ev.x is not None:
cols.append(fits.Column(name='X', format='E', array=np.asarray(ev.x, dtype=float)))
if ev.y is not None:
cols.append(fits.Column(name='Y', format='E', array=np.asarray(ev.y, dtype=float)))
if ev.energy is not None:
cols.append(fits.Column(name='ENERGY', format='E', array=np.asarray(ev.energy, dtype=float)))
hdu_evt = fits.BinTableHDU.from_columns(cols, name='EVENTS')
hdu_evt.header['EXTNAME'] = 'EVENTS'
if ev.header is not None:
for k, v in dict(ev.header).items():
key = str(k).upper()
if key in hdu_evt.header:
continue
if key in {'XTENSION', 'BITPIX', 'NAXIS'}:
continue
try:
hdu_evt.header[key] = v
except Exception:
continue
prih = fits.PrimaryHDU()
hdul = fits.HDUList([prih, hdu_evt])
if ev.gti_start is not None and ev.gti_stop is not None:
cols_g = [
fits.Column(name='START', format='D', array=np.asarray(ev.gti_start, dtype=float)),
fits.Column(name='STOP', format='D', array=np.asarray(ev.gti_stop, dtype=float)),
]
hdul.append(fits.BinTableHDU.from_columns(cols_g, name='GTI'))
hdul.writeto(outp, overwrite=overwrite)
return outp
[docs]
def guess_ogip_kind(path) -> Literal['arf', 'rmf', 'pha', 'lc', 'evt']:
p = Path(path)
name = p.name.lower()
if name.endswith('.arf'):
return 'arf'
if name.endswith('.rmf') or name.endswith('.rsp'):
return 'rmf'
if name.endswith('.pha') or name.endswith('.pi'):
return 'pha'
with fits.open(p) as h:
extnames = {getattr(x, 'name', '').upper() for x in h}
if 'SPECRESP' in extnames and 'MATRIX' not in extnames:
return 'arf'
if 'MATRIX' in extnames:
return 'rmf'
if 'SPECTRUM' in extnames:
return 'pha'
has_time = False
for x in h:
d = getattr(x, 'data', None)
cols = getattr(d, 'columns', None)
names = getattr(cols, 'names', ()) if cols is not None else ()
if 'TIME' in names:
has_time = True
break
if 'EVENTS' in extnames or has_time:
for x in h:
d = getattr(x, 'data', None)
cols = getattr(d, 'columns', None)
names = getattr(cols, 'names', ()) if cols is not None else ()
if 'RATE' in names or 'COUNTS' in names:
return 'lc'
return 'evt'
return 'pha'
@overload
def readfits(path, kind: Literal['arf']) -> ArfData: ...
@overload
def readfits(path, kind: Literal['rmf']) -> RmfData: ...
@overload
def readfits(path, kind: Literal['pha']) -> PhaData: ...
@overload
def readfits(path, kind: Literal['lc']) -> LightcurveData: ...
@overload
def readfits(path, kind: Literal['evt']) -> EventData: ...
@overload
def readfits(path, kind: None = ...) -> OgipData: ...
[docs]
def readfits(path, kind: Optional[Literal['arf', 'rmf', 'pha', 'lc', 'evt']] = None) -> OgipData:
k = kind or guess_ogip_kind(path)
if k == 'arf':
return read_arf(path)
if k == 'rmf':
return read_rmf(path)
if k == 'pha':
return read_pha(path)
if k == 'lc':
return read_lc(path)
if k == 'evt':
return read_evt(path)
raise ValueError(f"Unknown OGIP kind: {k}")
[docs]
def read_arf(path) -> ArfData:
return OgipArfReader(path).read()
[docs]
def read_rmf(path) -> RmfData:
return OgipRmfReader(path).read()
[docs]
def read_pha(path) -> PhaData:
return OgipPhaReader(path).read()
[docs]
def read_lc(path) -> LightcurveData:
return OgipLightcurveReader(path).read()
[docs]
def read_evt(path) -> EventData:
return OgipEventReader(path).read()
[docs]
def write_pha(data: PhaData, outpath: str | Path, *, overwrite: bool = False) -> Path:
return PhaWriter(data, outpath).write(overwrite=overwrite)
[docs]
def write_arf(data: ArfData, outpath: str | Path, *, overwrite: bool = False) -> Path:
return ArfWriter(data, outpath).write(overwrite=overwrite)
[docs]
def write_rmf(data: RmfData, outpath: str | Path, *, overwrite: bool = False) -> Path:
return RmfWriter(data, outpath).write(overwrite=overwrite)
[docs]
def write_lc(data: LightcurveData, outpath: str | Path, *, overwrite: bool = False) -> Path:
return LightcurveWriter(data, outpath).write(overwrite=overwrite)
[docs]
def write_evt(data: EventData, outpath: str | Path, *, overwrite: bool = False) -> Path:
return EventWriter(data, outpath).write(overwrite=overwrite)
@overload
def writefits(data: PhaData, outpath: str | Path, kind: Literal['pha'], *, overwrite: bool = ...) -> Path: ...
@overload
def writefits(data: PhaData, outpath: str | Path, kind: None = ..., *, overwrite: bool = ...) -> Path: ...
@overload
def writefits(data: OgipWritableData, outpath: str | Path, kind: Literal['arf', 'rmf', 'lc', 'evt'], *, overwrite: bool = ...) -> Path: ...
[docs]
def writefits(
data: OgipWritableData,
outpath: str | Path,
kind: Optional[Literal['arf', 'rmf', 'pha', 'lc', 'evt']] = None,
*,
overwrite: bool = False,
) -> Path:
k = kind
if k is None:
if isinstance(data, PhaData):
k = 'pha'
else:
k = cast(Optional[Literal['arf', 'rmf', 'pha', 'lc', 'evt']], getattr(data, 'kind', None))
if k == 'pha':
return write_pha(cast(PhaData, data), outpath, overwrite=overwrite)
if k == 'arf':
return write_arf(cast(ArfData, data), outpath, overwrite=overwrite)
if k == 'rmf':
return write_rmf(cast(RmfData, data), outpath, overwrite=overwrite)
if k == 'lc':
return write_lc(cast(LightcurveData, data), outpath, overwrite=overwrite)
if k == 'evt':
return write_evt(cast(EventData, data), outpath, overwrite=overwrite)
raise ValueError(f"Unknown writefits kind: {k!r}")
__all__ = [
"OgipArfReader",
"OgipRmfReader",
"OgipPhaReader",
"OgipLightcurveReader",
"OgipEventReader",
"ArfReader",
"RmfReader",
"RspReader",
"LightcurveReader",
"OgipData",
"band_from_arf_bins",
"channel_mask_from_ebounds",
"guess_ogip_kind",
"readfits",
"read_arf",
"read_rmf",
"read_pha",
"read_lc",
"read_evt",
"PhaWriter",
"ArfWriter",
"RmfWriter",
"LightcurveWriter",
"EventWriter",
"write_arf",
"write_rmf",
"write_lc",
"write_evt",
"write_pha",
"writefits",
]