import copy
import logging
import os
import shutil
import multiprocessing as mp
from .apply_wcs_adjust import ApplyWCSAdjustStep
from .astrometric_align import AstrometricAlignStep
from .astrometric_catalog import AstrometricCatalogStep
from .download import DownloadStep
from .gaia_query import GaiaQueryStep
from .level_match import LevelMatchStep
from .lv1 import Lv1Step
from .lv2 import Lv2Step
from .lv3 import Lv3Step
from .lyot_mask import LyotMaskStep
from .lyot_separate import LyotSeparateStep
from .mosaic_individual_fields import MosaicIndividualFieldsStep
from .move_raw_obs import MoveRawObsStep
from .multi_tile_destripe import MultiTileDestripeStep
from .single_tile_destripe import SingleTileDestripeStep
from .get_wcs_adjust import GetWCSAdjustStep
from .anchoring import AnchoringStep
from .psf_matching import PSFMatchingStep
from .psf_model import PSFModelStep
from .release import ReleaseStep
from .regress_against_previous import RegressAgainstPreviousStep
from .utils import *
# All possible steps
ALLOWED_STEPS = [
"download",
"gaia_query",
"lv1",
"lv2",
"single_tile_destripe",
"get_wcs_adjust",
"apply_wcs_adjust",
"lyot_mask",
"lyot_separate",
"multi_tile_destripe",
"level_match",
"psf_model",
"lv3",
"astrometric_catalog",
"astrometric_align",
"mosaic_individual_fields",
"anchoring",
"psf_matching",
"release",
"regress_against_previous",
]
# Steps that don't operate per-band
COMBINED_BAND_STEPS = [
"download",
"gaia_query",
"get_wcs_adjust",
"astrometric_align",
"anchoring",
"release",
"regress_against_previous",
]
# .fits extensions and input/output
# directories
IN_STEP_EXTS = {
"download": None,
"lv1": "uncal",
"lv2": "rate",
"astrometric_catalog": "i2d",
"astrometric_align": "i2d",
"anchoring": "i2d_align",
"psf_matching": "i2d_anchor",
"release": None,
}
IN_BAND_DIRS = {
"lv1": "uncal",
"lv2": "lv1",
"astrometric_catalog": "lv3",
"astrometric_align": "lv3",
"anchoring": "lv3",
"psf_matching": "lv3",
"release": "lv3",
}
OUT_BAND_DIRS = {
"astrometric_catalog": "lv3",
"astrometric_align": "lv3",
"anchoring": "anchoring",
"release": None,
}
log = logging.getLogger(__name__)
[docs]
class PJPipeline:
def __init__(
self,
config_file=None,
local_file=None,
):
"""Overall wrapper for pjpipe.
Args:
config_file: Either string to location of config.toml file,
or dict of preloaded toml
local_file: Either string to location of local.toml file,
or dict of preloaded toml
"""
if config_file is None:
raise ValueError("config_file should be defined")
elif isinstance(config_file, str):
config = load_toml(config_file)
elif isinstance(config_file, dict):
config = copy.deepcopy(config_file)
else:
raise ValueError("config_file should be one of str, dict")
if local_file is None:
raise ValueError("local_file should be defined")
elif isinstance(local_file, str):
local = load_toml(local_file)
elif isinstance(local_file, dict):
local = copy.deepcopy(local_file)
else:
raise ValueError("local_file should be one of str, dict")
if "webb_psf_data" in local:
os.environ["WEBBPSF_PATH"] = local["webb_psf_data"]
if "WEBBPSF_PATH" not in os.environ:
log.warning(
"WEBBPSF_PATH not set. If trying to PSF model, this will cause an error"
)
log.info("Starting PHANGS-JWST pipeline")
if "crds_context" in local:
crds_context = local["crds_context"]
os.environ['CRDS_CONTEXT'] = copy.deepcopy(crds_context)
else:
crds_context = "Default"
# Pull in needed values from the configs
self.targets = config["targets"]
self.bands = config["bands"]
self.steps = config["steps"]
self.version = config["version"]
self.parameters = config["parameters"]
self.raw_dir = local["raw_dir"]
self.reprocess_dir = os.path.join(
local["reprocess_dir"],
self.version,
)
self.release_dir = os.path.join(
local["reprocess_dir"],
"release",
self.version,
)
self.alignment_dir = local["alignment_dir"]
if "kernel_dir" in local:
self.kernel_dir = local["kernel_dir"]
else:
self.kernel_dir = None
if "anchor_ref_dir" in local:
self.anchor_ref_dir = local["anchor_ref_dir"]
else:
self.anchor_ref_dir = None
if "processors" in local:
procs = local["processors"]
else:
procs = mp.cpu_count()
self.procs = procs
# Log the environment variables that should be set
log.info(f"Using CRDS_SERVER_URL: {os.environ['CRDS_SERVER_URL']}")
log.info(f"Using CRDS_PATH: {os.environ['CRDS_PATH']}")
log.info(f"Using CRDS_CONTEXT: {crds_context}")
log.info(f"Using {self.procs} processes")
# Log targets/band/steps out
log.info("Found targets:")
for target in self.targets:
log.info(f"-> {target}")
log.info(f"Found bands:")
for band in self.bands:
log.info(f"-> {band}")
log.info(f"Found steps:")
for step in self.steps:
log.info(f"-> {step}")
[docs]
def do_pipeline(self):
progress_dict = {}
for target in self.targets:
progress_dict[target] = {}
log.info(f"Beginning reprocessing: {target}")
for step_full in self.steps:
# Parse out any potential instrument/observing type specific steps
step = None
step_instrument = None
step_science_type = None
if "." in step_full:
step_split = step_full.split(".")
# First search for sci/bgr
if "bgr" in step_split:
step_science_type = "bgr"
step_split.remove(step_science_type)
if "sci" in step_split:
step_science_type = "sci"
step_split.remove(step_science_type)
step, step_instrument = copy.deepcopy(step_split)
else:
step = copy.deepcopy(step_full)
if step not in ALLOWED_STEPS:
raise ValueError(
f"step should be one of {ALLOWED_STEPS}, not {step}"
)
if step in self.parameters:
step_parameters = self.parameters[step]
else:
step_parameters = {}
# Get a target directory
target_dir = os.path.join(
self.reprocess_dir,
target,
)
if not os.path.exists(target_dir):
os.makedirs(target_dir)
# Get the default in/out .fits extension for this step
if step in IN_STEP_EXTS:
in_step_ext = IN_STEP_EXTS[step]
else:
in_step_ext = "cal"
# Some steps operate on all bands, distinguish that here
if step in COMBINED_BAND_STEPS:
log.info(f"Beginning {step}")
# Download
if step == "download":
download_dir = os.path.join(
self.raw_dir,
target,
)
download = DownloadStep(
target=target,
download_dir=download_dir,
procs=self.procs,
**step_parameters,
)
step_result = download.do_step()
elif step == "gaia_query":
gaia_query = GaiaQueryStep(
target=target,
out_dir=self.alignment_dir,
**step_parameters,
)
step_result = gaia_query.do_step()
elif step == "get_wcs_adjust":
get_wcs_adjust = GetWCSAdjustStep(
directory=target_dir,
progress_dict=progress_dict[target],
target=target,
alignment_dir=self.alignment_dir,
procs=self.procs,
**step_parameters,
)
step_result = get_wcs_adjust.do_step()
elif step == "astrometric_align":
astrometric_catalog = AstrometricAlignStep(
target=target,
bands=self.bands,
progress_dict=progress_dict[target],
target_dir=target_dir,
catalog_dir=self.alignment_dir,
step_ext=in_step_ext,
procs=self.procs,
step_parameters=step_parameters,
)
step_result = astrometric_catalog.do_step()
# anchoring is in the part operating for all bands because
# we need more control on the sequence (reference nircam and miri bands first)
elif step == "anchoring":
# By default, we assume we've done alignment.
# If that's not the case, fall back to i2d
if "astrometric_align" not in self.steps:
in_step_ext = "i2d"
in_subdir = IN_BAND_DIRS[step]
out_subdir = OUT_BAND_DIRS[step]
anchoring = AnchoringStep(
target=target,
bands=self.bands,
in_dir=target_dir,
in_subdir=in_subdir,
out_subdir=out_subdir,
ref_dir=self.anchor_ref_dir,
kernel_dir=self.kernel_dir,
in_step_ext=in_step_ext,
out_step_ext='i2d_anchor',
procs=self.procs,
**step_parameters,
)
step_result = anchoring.do_step()
elif step == "release":
release = ReleaseStep(
in_dir=target_dir,
out_dir=self.release_dir,
target=target,
bands=self.bands,
**step_parameters,
)
step_result = release.do_step()
elif step == "regress_against_previous":
regress = RegressAgainstPreviousStep(
target=target,
in_dir=self.release_dir,
curr_version=self.version,
**step_parameters,
)
step_result = regress.do_step()
else:
raise ValueError(
f"step should be one of {ALLOWED_STEPS}, not {step}"
)
# If we're not successful here, log a warning and delete the whole target folder
if not step_result:
log.warning(
f"Failures detected for {target}. "
f"Will remove target directory and continue."
)
shutil.rmtree(target_dir)
log.info(f"Completed {step}")
else:
for band_full in self.bands:
if "_bgr" in band_full:
is_bgr = True
else:
is_bgr = False
band = get_short_band_name(band_full)
band_dir = os.path.join(
target_dir,
band_full,
)
if band_full not in progress_dict[target]:
progress_dict[target][band_full] = {
"success": True,
"data_moved": False,
"dir": None,
"run_astro_cat": False,
}
# Pull out the band type
band_type = get_band_type(band_full)
# Pull out whether we will do this step for this particular band
# and science type
do_step = True
if step_instrument is not None:
if step_instrument != band_type:
do_step = False
if step_science_type is not None:
if is_bgr and step_science_type == "sci":
do_step = False
if not is_bgr and step_science_type == "bgr":
do_step = False
if not do_step:
continue
# If we've failed elsewhere, skip here
if not progress_dict[target][band_full]["success"]:
continue
log.info(f"Beginning {step} for {band_full}")
# Pull out and make the directories we need
in_dir = copy.deepcopy(progress_dict[target][band_full]["dir"])
if in_dir is None:
# Pull the in band directory, else default to cal
if step in IN_BAND_DIRS:
in_band_dir = IN_BAND_DIRS[step]
else:
in_band_dir = "cal"
in_dir = os.path.join(
band_dir,
in_band_dir,
)
# If we need a specific out directory, pull it here.
# Else default to the name of the step
if step in OUT_BAND_DIRS:
out_band_dir = OUT_BAND_DIRS[step]
else:
out_band_dir = copy.deepcopy(step)
out_dir = os.path.join(
band_dir,
out_band_dir,
)
log.info(f"out_dir: {out_dir}")
if not os.path.exists(in_dir):
log.info(f"Making in_dir: {in_dir}")
os.makedirs(in_dir)
else:
log.info(f"in_dir exists: {in_dir}")
if not os.path.exists(out_dir):
log.info(f"Making out_dir: {out_dir}")
os.makedirs(out_dir)
else:
log.info(f"out_dir exists: {out_dir}")
# Move raw observations
if not progress_dict[target][band_full]["data_moved"]:
log.info(f"Moving raw observations for {band_full}")
if "move_raw_obs" in self.parameters:
move_raw_params = self.parameters["move_raw_obs"]
else:
move_raw_params = {}
kws = get_kws(
parameters=move_raw_params,
func=MoveRawObsStep,
target=target,
band=band_full,
max_level=1,
)
move_raw_obs = MoveRawObsStep(
target=target,
band=band_full,
step_ext=in_step_ext,
in_dir=self.raw_dir,
out_dir=in_dir,
dr_version=self.version,
is_bgr=is_bgr,
**kws,
)
step_result = move_raw_obs.do_step()
progress_dict[target][band_full]["success"] = copy.deepcopy(
step_result
)
# If we're not successful here, log a warning and delete the band folder,
# and move on
if not progress_dict[target][band_full]["success"]:
log.warning(
f"Failures detected moving raw data for {target}, {band}. "
f"Removing directories and continuing"
)
shutil.rmtree(out_dir)
continue
# Save out file moved state
progress_dict[target][band_full]["data_moved"] = True
log.info(f"Moved raw observations for {band_full}")
# Level 1 processing
if step == "lv1":
kws = get_kws(
parameters=step_parameters,
func=Lv1Step,
target=target,
band=band_full,
max_level=0,
)
lv1 = Lv1Step(
target=target,
band=band_full,
in_dir=in_dir,
out_dir=out_dir,
dr_version=self.version,
step_ext=in_step_ext,
procs=self.procs,
is_bgr=is_bgr,
**kws,
)
step_result = lv1.do_step()
# Level 2 processing
elif step == "lv2":
kws = get_kws(
parameters=step_parameters,
func=Lv2Step,
target=target,
band=band_full,
max_level=0,
)
lv2 = Lv2Step(
target=target,
band=band_full,
in_dir=in_dir,
out_dir=out_dir,
dr_version=self.version,
step_ext=in_step_ext,
is_bgr=is_bgr,
procs=self.procs,
**kws,
)
step_result = lv2.do_step()
elif step == "single_tile_destripe":
kws = get_kws(
parameters=step_parameters,
func=SingleTileDestripeStep,
target=target,
band=band_full,
)
# If we're going from lv1, then this will be a rate file,
# otherwise a cal
if os.path.split(in_dir)[-1] == "lv1":
in_step_ext = "rate"
destripe = SingleTileDestripeStep(
in_dir=in_dir,
out_dir=out_dir,
step_ext=in_step_ext,
procs=self.procs,
**kws,
)
step_result = destripe.do_step()
elif step == "lyot_mask":
kws = get_kws(
parameters=step_parameters,
func=LyotMaskStep,
target=target,
band=band_full,
)
lyot_mask = LyotMaskStep(
in_dir=in_dir,
out_dir=out_dir,
step_ext=in_step_ext,
procs=self.procs,
**kws,
)
step_result = lyot_mask.do_step()
elif step == "lyot_separate":
kws = get_kws(
parameters=step_parameters,
func=LyotSeparateStep,
target=target,
band=band_full,
)
lyot_separate = LyotSeparateStep(
in_dir=in_dir,
out_dir=out_dir,
step_ext=in_step_ext,
procs=self.procs,
**kws,
)
step_result = lyot_separate.do_step()
elif step == "multi_tile_destripe":
kws = get_kws(
parameters=step_parameters,
func=MultiTileDestripeStep,
target=target,
band=band_full,
)
multi_tile_destripe = MultiTileDestripeStep(
in_dir=in_dir,
out_dir=out_dir,
step_ext=in_step_ext,
procs=self.procs,
**kws,
)
step_result = multi_tile_destripe.do_step()
elif step == "apply_wcs_adjust":
kws = get_kws(
parameters=step_parameters,
func=ApplyWCSAdjustStep,
target=target,
band=band_full,
)
wcs_adjust_file = os.path.join(
target_dir,
f"{target}_wcs_adjust.toml",
)
wcs_adjust = load_toml(wcs_adjust_file)
apply_wcs = ApplyWCSAdjustStep(
wcs_adjust=wcs_adjust,
in_dir=in_dir,
out_dir=out_dir,
step_ext=in_step_ext,
procs=self.procs,
**kws,
)
step_result = apply_wcs.do_step()
elif step == "level_match":
kws = get_kws(
parameters=step_parameters,
func=LevelMatchStep,
target=target,
band=band_full,
)
level_match = LevelMatchStep(
in_dir=in_dir,
out_dir=out_dir,
step_ext=in_step_ext,
procs=self.procs,
band=band_full,
**kws,
)
step_result = level_match.do_step()
elif step == "psf_model":
kws = get_kws(
parameters=step_parameters,
func=PSFModelStep,
target=target,
band=band_full,
)
psf_model = PSFModelStep(
in_dir=in_dir,
out_dir=out_dir,
step_ext=in_step_ext,
procs=self.procs,
**kws,
)
step_result = psf_model.do_step()
elif step == "lv3":
kws = get_kws(
parameters=step_parameters,
func=Lv3Step,
target=target,
band=band_full,
max_level=0,
)
lv3 = Lv3Step(
target=target,
band=band_full,
in_dir=in_dir,
out_dir=out_dir,
dr_version=self.version,
is_bgr=is_bgr,
step_ext=in_step_ext,
procs=self.procs,
**kws,
)
step_result = lv3.do_step()
elif step == "astrometric_catalog":
kws = get_kws(
parameters=step_parameters,
func=AstrometricCatalogStep,
target=target,
band=band_full,
max_level=0,
)
astrometric_catalog = AstrometricCatalogStep(
target=target, band=band_full, in_dir=in_dir, **kws
)
step_result = astrometric_catalog.do_step()
progress_dict[target][band_full]["run_astro_cat"] = True
elif step == "mosaic_individual_fields":
# Here, the input directory should be level 3
mosaic_in_dir = os.path.join(
band_dir,
"lv3",
)
kws = get_kws(
parameters=step_parameters,
func=MosaicIndividualFieldsStep,
target=target,
band=band_full,
max_level=0,
)
mosaic_individual_fields = MosaicIndividualFieldsStep(
target=target,
band=band_full,
in_dir=mosaic_in_dir,
out_dir=out_dir,
procs=self.procs,
**kws,
)
step_result = mosaic_individual_fields.do_step()
elif step == "psf_matching":
# By default, we assume we've done anchoring.
# If not, catch the cases
if "anchoring" not in self.steps:
# If we've done astrometric alignment
if "astrometric_align" in self.steps:
in_step_ext = "i2d_align"
# Or if not
else:
in_step_ext = "i2d"
psf_matching = PSFMatchingStep(
target=target,
band=band,
in_dir=in_dir,
out_dir=out_dir,
kernel_dir=self.kernel_dir,
in_step_ext=in_step_ext,
procs=self.procs,
**step_parameters,
)
step_result = psf_matching.do_step()
else:
raise ValueError(
f"step should be one of {ALLOWED_STEPS}, not {step}"
)
progress_dict[target][band_full]["success"] = copy.deepcopy(
step_result
)
progress_dict[target][band_full]["dir"] = copy.deepcopy(out_dir)
# If we're not successful here, log a warning and delete the band folder
if not progress_dict[target][band_full]["success"]:
log.warning(
f"Failures detected in step {step} for {target}, {band_full}. "
f"Removing folder and continuing"
)
shutil.rmtree(band_dir)
log.info(f"Completed {step} for {band_full}")