"""Functions and classes for fitting contours using PISA."""
import configparser
from contextlib import contextmanager
import io
import multiprocessing
import os
import random
from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING
from configupdater import ConfigUpdater
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from graphnet.utilities.imports import has_pisa_package
if has_pisa_package() or TYPE_CHECKING:
import pisa # pyright: reportMissingImports=false
from pisa.core.distribution_maker import DistributionMaker
from pisa.core.pipeline import Pipeline
from pisa.analysis.analysis import Analysis
from pisa import ureg
from graphnet.data.sqlite import create_table_and_save_to_sql
mpl.use("pdf")
plt.rc("font", family="serif")
[docs]
@contextmanager
def config_updater(
config_path: str,
new_config_path: Optional[str] = None,
dummy_section: str = "temp",
) -> ConfigUpdater:
"""Update config files and saves them to file.
Args:
config_path: Path to original config file.
new_config_path: Path to save updated config file.
dummy_section: Dummy section name to use for config files without
section headers.
Yields:
ConfigUpdater instance for programatically updating config file.
"""
# Modify original config file is no new config path is provided.
if new_config_path is None:
new_config_path = config_path
# Load config file
updater = ConfigUpdater()
has_dummy_section = False
try:
updater.read(config_path)
# If it is missing section headers (e.g., binning.cfg), add a dummy section
# header before reading file contents.
except configparser.MissingSectionHeaderError:
with open(config_path, "r") as configfile:
updater.read_string(f"[{dummy_section}]\n" + configfile.read())
has_dummy_section = True
# Expose updater instance in contest (i.e.,
# `with config_updater(...) as updater:``).
try:
yield updater
# Write new config to file
finally:
with open(new_config_path, "w") as configfile:
if has_dummy_section:
# Removing dummy section header if necessary
with io.StringIO() as buffer:
updater.write(buffer)
buffer.seek(0)
lines = buffer.readlines()[1:]
configfile.writelines(lines)
else:
updater.write(configfile)
[docs]
class WeightFitter:
"""Class for fitting weights using PISA."""
def __init__(
self,
database_path: str,
truth_table: str = "truth",
index_column: str = "event_no",
statistical_fit: bool = False,
) -> None:
"""Construct `WeightFitter`."""
self._database_path = database_path
self._truth_table = truth_table
self._index_column = index_column
self._statistical_fit = statistical_fit
[docs]
def fit_weights(
self,
config_outdir: str,
weight_name: str = "",
pisa_config_dict: Optional[Dict] = None,
add_to_database: bool = False,
) -> pd.DataFrame:
"""Fit flux weights to each neutrino event in `self._database_path`.
If `statistical_fit=True`, only statistical effects are accounted for.
If `True`, certain systematic effects are included, but not
hypersurfaces.
Args:
config_outdir: The output directory in which to store the
configuration.
weight_name: The name of the weight. If `add_to_database=True`,
this will be the name of the table.
pisa_config_dict: The dictionary of PISA configurations. Can be
used to change assumptions regarding the fit.
add_to_database: If `True`, a table will be added to the database
called `weight_name` with two columns:
`[index_column, weight_name]`
Returns:
A dataframe with columns `[index_column, weight_name]`.
"""
# If its a standard weight
if pisa_config_dict is None:
if not weight_name:
print(weight_name)
weight_name = "pisa_weight_graphnet_standard"
# If it is a custom weight without name
elif pisa_config_dict is not None:
if not weight_name:
weight_name = "pisa_custom_weight"
pisa_config_path = self._make_config(
config_outdir, weight_name, pisa_config_dict
)
model = Pipeline(pisa_config_path)
if self._statistical_fit == "True":
# Only free parameters will be [aeff_scale] - corresponding to a statistical fit
free_params = model.params.free.names
for free_param in free_params:
if free_param not in ["aeff_scale"]:
model.params[free_param].is_fixed = True
# for stage in range(len(model.stages)):
model.stages[-1].apply_mode = "events"
model.stages[-1].calc_mode = "events"
model.run()
all_data = []
for container in model.data:
data = pd.DataFrame(container["event_no"], columns=["event_no"])
data[weight_name] = container["weights"]
all_data.append(data)
results = pd.concat(all_data)
if add_to_database:
create_table_and_save_to_sql(
results.columns, weight_name, self._database_path
)
return results.sort_values("event_no").reset_index(drop=True)
def _make_config(
self,
config_outdir: str,
weight_name: str,
pisa_config_dict: Optional[Dict] = None,
) -> str:
os.makedirs(config_outdir + "/" + weight_name, exist_ok=True)
if pisa_config_dict is None:
# Run on standard settings
pisa_config_dict = {
"reco_energy": {"num_bins": 8},
"reco_coszen": {"num_bins": 8},
"pid": {"bin_edges": [0, 0.5, 1]},
"true_energy": {"num_bins": 200},
"true_coszen": {"num_bins": 200},
"livetime": 10
* 0.01, # set to 1% of 10 years - correspond to the size of the oscNext burn sample
}
pisa_config_dict["pipeline"] = self._database_path
pisa_config_dict["post_fix"] = None
pipeline_cfg_path = self._create_configs(
pisa_config_dict, config_outdir + "/" + weight_name
)
return pipeline_cfg_path
def _create_configs(self, config_dict: Dict, path: str) -> str:
# Update binning config
root = os.path.realpath(
os.path.join(os.getcwd(), os.path.dirname(__file__))
)
if config_dict["post_fix"] is not None:
config_name = "config%s" % config_dict["post_fix"]
else:
# config_dict["post_fix"] = '_pred'
config_name = "config"
with config_updater(
root
+ "/resources/configuration_templates/binning_config_template.cfg",
"%s/binning_%s.cfg" % (path, config_name),
dummy_section="binning",
) as updater:
updater["binning"][
"graphnet_dynamic_binning.reco_energy"
].value = (
"{'num_bins':%s, 'is_log':True, 'domain':[0.5,55] * units.GeV, 'tex': r'E_{\\rm reco}'}"
% config_dict["reco_energy"]["num_bins"]
) # noqa: W605
updater["binning"][
"graphnet_dynamic_binning.reco_coszen"
].value = (
"{'num_bins':%s, 'is_lin':True, 'domain':[-1,1], 'tex':r'\\cos{\\theta}_{\\rm reco}'}"
% config_dict["reco_coszen"]["num_bins"]
) # noqa: W605
updater["binning"]["graphnet_dynamic_binning.pid"].value = (
"{'bin_edges': %s, 'tex':r'{\\rm PID}'}"
% config_dict["pid"]["bin_edges"]
) # noqa: W605
updater["binning"]["true_allsky_fine.true_energy"].value = (
"{'num_bins':%s, 'is_log':True, 'domain':[1,1000] * units.GeV, 'tex': r'E_{\\rm true}'}"
% config_dict["true_energy"]["num_bins"]
) # noqa: W605
updater["binning"]["true_allsky_fine.true_coszen"].value = (
"{'num_bins':%s, 'is_lin':True, 'domain':[-1,1], 'tex':r'\\cos\,\\theta_{Z,{\\rm true}}'}" # noqa: W605
% config_dict["true_coszen"]["num_bins"]
) # noqa: W605
# Update pipeline config
with config_updater(
root
+ "/resources/configuration_templates/pipeline_config_weight_template.cfg",
"%s/pipeline_%s.cfg" % (path, config_name),
) as updater:
updater["pipeline"].add_before.comment(
"#include %s/binning_%s.cfg as binning" % (path, config_name)
)
updater["data.sqlite_loader"]["post_fix"].value = config_dict[
"post_fix"
]
updater["data.sqlite_loader"]["database"].value = config_dict[
"pipeline"
]
if "livetime" in config_dict.keys():
updater["aeff.aeff"]["param.livetime"].value = (
"%s * units.common_year" % config_dict["livetime"]
)
return "%s/pipeline_%s.cfg" % (path, config_name)
[docs]
class ContourFitter:
"""Class for fitting contours using PISA."""
def __init__(
self,
outdir: str,
pipeline_path: str,
post_fix: str = "_pred",
model_name: str = "gnn",
include_retro: bool = True,
statistical_fit: bool = False,
):
"""Construct `ContourFitter`."""
self._outdir = outdir
self._pipeline_path = pipeline_path
self._post_fix = post_fix
self._model_name = model_name
self._include_retro = include_retro
self._statistical_fit = str(statistical_fit)
self._allowed_contour_types = ["1d", "2d"]
[docs]
def fit_1d_contour(
self,
run_name: str,
config_dict: Dict,
grid_size: int = 30,
n_workers: int = 1,
theta23_minmax: Tuple[float, float] = (36.0, 54.0),
dm31_minmax: Tuple[float, float] = (2.3, 2.7),
) -> None:
"""Fit 1D contours."""
self._fit_contours(
config_dict=config_dict,
run_name=run_name,
grid_size=grid_size,
n_workers=n_workers,
theta23_minmax=theta23_minmax,
dm31_minmax=dm31_minmax,
contour_type="1d",
)
[docs]
def fit_2d_contour(
self,
run_name: str,
config_dict: Dict,
grid_size: int = 30,
n_workers: int = 1,
theta23_minmax: Tuple[float, float] = (36.0, 54.0),
dm31_minmax: Tuple[float, float] = (2.3, 2.7),
) -> None:
"""Fit 2D contours."""
self._fit_contours(
config_dict=config_dict,
run_name=run_name,
grid_size=grid_size,
n_workers=n_workers,
theta23_minmax=theta23_minmax,
dm31_minmax=dm31_minmax,
contour_type="2d",
)
def _check_inputs(
self,
contour_type: str,
dm31_minmax: Tuple[float, float],
theta23_minmax: Tuple[float, float],
n_workers: int,
) -> bool:
"""Check whether inputs are as expected."""
if contour_type.lower() not in self._allowed_contour_types:
print(
"%s not recognized as valid contour type. Only %s is recognized"
% (contour_type, self._allowed_contour_types)
)
return False
if (
(len(theta23_minmax) != 2)
or (len(dm31_minmax) != 2)
or (dm31_minmax[0] > dm31_minmax[1])
or (theta23_minmax[0] > theta23_minmax[1])
):
print(
"theta23 or dm31 min max values are not understood. Please provide a list on the form [min, max] for both variables"
)
return False
if n_workers < 1:
print("found n_workers < 1. n_workers must be positive integers.")
return False
return True
def _fit_contours(
self,
run_name: str,
config_dict: Dict,
grid_size: int,
n_workers: int,
contour_type: str,
theta23_minmax: Tuple[float, float],
dm31_minmax: Tuple[float, float],
) -> None:
"""Fit contours."""
inputs_ok = self._check_inputs(
contour_type=contour_type,
dm31_minmax=dm31_minmax,
theta23_minmax=theta23_minmax,
n_workers=n_workers,
)
if not inputs_ok:
return
minimizer_cfg = self._get_minimizer_path(config_dict)
cfgs = self._setup_config_files(run_name, config_dict)
theta23_range = np.linspace(
theta23_minmax[0], theta23_minmax[1], grid_size
)
dm31_range = (
np.linspace(dm31_minmax[0], dm31_minmax[1], grid_size) * 1e-3
)
if contour_type.lower() == "1d":
settings = self._make_1d_settings(
cfgs=cfgs,
grid_size=grid_size,
run_name=run_name,
minimizer_cfg=minimizer_cfg,
theta23_range=theta23_range,
dm31_range=dm31_range,
n_workers=n_workers,
)
p = multiprocessing.Pool(processes=len(settings))
_ = p.map_async(self._parallel_fit_1d_contour, settings)
p.close()
p.join()
# self._parallel_fit_1d_contour(settings[0])
elif contour_type.lower() == "2d":
settings = self._make_2d_settings(
cfgs=cfgs,
grid_size=grid_size,
run_name=run_name,
minimizer_cfg=minimizer_cfg,
theta23_range=theta23_range,
dm31_range=dm31_range,
n_workers=n_workers,
)
p = multiprocessing.Pool(processes=len(settings))
_ = p.map_async(self._parallel_fit_2d_contour, settings)
p.close()
p.join()
# self._parallel_fit_2d_contour(settings[0])
df = self._merge_temporary_files(run_name)
df.to_csv(self._outdir + "/" + run_name + "/merged_results.csv")
def _merge_temporary_files(self, run_name: str) -> pd.DataFrame:
files = os.listdir(self._outdir + "/" + run_name + "/tmp")
df = pd.concat(
[
pd.read_csv(f"{self._outdir}/{run_name}/tmp/{file}")
for file in files
],
ignore_index=True,
)
return df
def _parallel_fit_2d_contour(self, settings: List[List[Any]]) -> None:
"""Fit 2D contours in parallel.
Length of settings determines the amount of jobs this worker gets.
Results are saved to temporary .csv-files that are later merged.
Args:
settings: A list of fitting settings.
"""
results = []
for i in range(len(settings)):
(
cfg_path,
model_name,
outdir,
theta23_value,
deltam31_value,
id,
run_name,
fix_all,
minimizer_cfg,
) = settings[i]
minimizer_cfg = pisa.utils.fileio.from_file(minimizer_cfg)
model = DistributionMaker([cfg_path])
data = model.get_outputs(return_sum=True)
ana = Analysis()
if fix_all == "True":
# Only free parameters will be [parameter, aeff_scale] - corresponding to a statistical fit
free_params = model.params.free.names
for free_param in free_params:
if free_param != "aeff_scale":
if free_param == "theta23":
model.params.theta23.is_fixed = True
model.params.theta23.nominal_value = (
float(theta23_value) * ureg.degree
)
elif free_param == "deltam31":
model.params.deltam31.is_fixed = True
model.params.deltam31.nominal_value = (
float(deltam31_value) * ureg.electron_volt**2
)
else:
model.params[free_param].is_fixed = True
else:
# Only fixed parameters will be [parameter]
model.params.theta23.is_fixed = True
model.params.deltam31.is_fixed = True
model.params.theta23.nominal_value = (
float(theta23_value) * ureg.degree
)
model.params.deltam31.nominal_value = (
float(deltam31_value) * ureg.electron_volt**2
)
model.reset_all()
result = ana.fit_hypo(
data,
model,
metric="mod_chi2",
minimizer_settings=minimizer_cfg,
fit_octants_separately=True,
)
results.append(
[
theta23_value,
deltam31_value,
result[0]["params"].theta23.value,
result[0]["params"].deltam31.value,
result[0]["metric_val"],
model_name,
id,
result[0]["minimizer_metadata"]["success"],
]
)
self._save_temporary_results(
outdir=outdir, run_name=run_name, results=results, id=id
)
def _parallel_fit_1d_contour(self, settings: List[List[Any]]) -> None:
"""Fit 1D contours in parallel.
Length of settings determines the amount of jobs this worker gets.
Results are saved to temporary .csv-files that are later merged.
Args:
settings: A list of fitting settings.
"""
results = []
for i in range(len(settings)):
(
cfg_path,
model_name,
outdir,
theta23_value,
deltam31_value,
id,
run_name,
parameter,
fix_all,
minimizer_cfg,
) = settings[i]
minimizer_cfg = pisa.utils.fileio.from_file(minimizer_cfg)
ana = Analysis()
model = DistributionMaker([cfg_path])
data = model.get_outputs(return_sum=True)
if fix_all == "True":
# Only free parameters will be [parameter, aeff_scale] - corresponding to a statistical fit
free_params = model.params.free.names
for free_param in free_params:
if free_param not in ["aeff_scale", "theta23", "deltam31"]:
model.params[free_param].is_fixed = True
if parameter == "theta23":
model.params.theta23.is_fixed = True
model.params.theta23.nominal_value = (
float(theta23_value) * ureg.degree
)
elif parameter == "deltam31":
model.params.deltam31.is_fixed = True
model.params.deltam31.nominal_value = (
float(deltam31_value) * ureg.electron_volt**2
)
else:
# Only fixed parameters will be [parameter]
if parameter == "theta23":
model.params.theta23.is_fixed = True
model.params.theta23.nominal_value = (
float(theta23_value) * ureg.degree
)
elif parameter == "deltam31":
model.params.deltam31.is_fixed = True
model.params.deltam31.nominal_value = (
float(deltam31_value) * ureg.electron_volt**2
)
else:
print("parameter not supported: %s" % parameter)
model.reset_all()
result = ana.fit_hypo(
data,
model,
metric="mod_chi2",
minimizer_settings=minimizer_cfg,
fit_octants_separately=True,
)
results.append(
[
theta23_value,
deltam31_value,
result[0]["params"].theta23.value,
result[0]["params"].deltam31.value,
result[0]["metric_val"],
model_name,
id,
result[0]["minimizer_metadata"]["success"],
]
)
self._save_temporary_results(
outdir=outdir, run_name=run_name, results=results, id=id
)
def _save_temporary_results(
self, outdir: str, run_name: str, results: List[List[Any]], id: int
) -> None:
os.makedirs(outdir + "/" + run_name + "/tmp", exist_ok=True)
results_df = pd.DataFrame(
data=results,
columns=[
"theta23_fixed",
"dm31_fixed",
"theta23_best_fit",
"dm31_best_fit",
"mod_chi2",
"model",
"id",
"converged",
],
)
results_df.to_csv(
outdir + "/" + run_name + "/tmp" + "/tmp_%s.csv" % id
)
def _make_2d_settings(
self,
cfgs: Dict,
grid_size: int,
run_name: str,
minimizer_cfg: str,
theta23_range: Tuple[float, float],
dm31_range: Tuple[float, float],
n_workers: int,
) -> List[np.ndarray]:
settings = []
count = 0
for model_name in cfgs.keys():
for i in range(0, grid_size):
for k in range(0, grid_size):
settings.append(
[
cfgs[model_name],
model_name,
self._outdir,
theta23_range[i],
dm31_range[k],
count,
run_name,
self._statistical_fit,
minimizer_cfg,
]
)
count += 1
random.shuffle(settings)
return np.array_split(settings, n_workers)
def _make_1d_settings(
self,
cfgs: Dict,
grid_size: int,
run_name: str,
minimizer_cfg: str,
theta23_range: Tuple[float, float],
dm31_range: Tuple[float, float],
n_workers: int,
) -> List[np.ndarray]:
settings = []
count = 0
for model_name in cfgs.keys():
for i in range(0, grid_size):
settings.append(
[
cfgs[model_name],
model_name,
self._outdir,
theta23_range[i],
-1,
count,
run_name,
"theta23",
self._statistical_fit,
minimizer_cfg,
]
)
count += 1
for i in range(0, grid_size):
settings.append(
[
cfgs[model_name],
model_name,
self._outdir,
-1,
dm31_range[i],
count,
run_name,
"deltam31",
self._statistical_fit,
minimizer_cfg,
]
)
count += 1
random.shuffle(settings)
return np.array_split(settings, n_workers)
def _setup_config_files(self, run_name: str, config_dict: Dict) -> Dict:
cfgs = {}
cfgs[self._model_name] = self._make_configs(
outdir=self._outdir,
post_fix=self._post_fix,
run_name=run_name,
is_retro=False,
pipeline_path=self._pipeline_path,
config_dict=config_dict,
)
if self._include_retro:
cfgs["retro"] = self._make_configs(
outdir=self._outdir,
post_fix=self._post_fix,
run_name=run_name,
is_retro=True,
pipeline_path=self._pipeline_path,
config_dict=config_dict,
)
return cfgs
def _get_minimizer_path(self, config_dict: Optional[Dict]) -> str:
if config_dict is not None and "minimizer_cfg" in config_dict.keys():
minimizer_cfg = config_dict["minimizer_cfg"]
else:
root = os.path.realpath(
os.path.join(os.getcwd(), os.path.dirname(__file__))
)
minimizer_cfg = (
root + "/resources/minimizer/graphnet_standard.json"
)
return minimizer_cfg
def _create_configs(self, config_dict: Dict, path: str) -> str:
# Update binning config
root = os.path.realpath(
os.path.join(os.getcwd(), os.path.dirname(__file__))
)
if config_dict["post_fix"] is not None:
config_name = "config%s" % config_dict["post_fix"]
else:
config_name = "config"
with config_updater(
root
+ "/resources/configuration_templates/binning_config_template.cfg",
"%s/binning_%s.cfg" % (path, config_name),
dummy_section="binning",
) as updater:
updater["binning"][
"graphnet_dynamic_binning.reco_energy"
].value = (
"{'num_bins':%s, 'is_log':True, 'domain':[0.5,55] * units.GeV, 'tex': r'E_{\\rm reco}'}"
% config_dict["reco_energy"]["num_bins"]
) # noqa: W605
updater["binning"][
"graphnet_dynamic_binning.reco_coszen"
].value = (
"{'num_bins':%s, 'is_lin':True, 'domain':[-1,1], 'tex':r'\\cos{\\theta}_{\\rm reco}'}"
% config_dict["reco_coszen"]["num_bins"]
) # noqa: W605
updater["binning"]["graphnet_dynamic_binning.pid"].value = (
"{'bin_edges': %s, 'tex':r'{\\rm PID}'}"
% config_dict["pid"]["bin_edges"]
) # noqa: W605
updater["binning"]["true_allsky_fine.true_energy"].value = (
"{'num_bins':%s, 'is_log':True, 'domain':[1,1000] * units.GeV, 'tex': r'E_{\\rm true}'}"
% config_dict["true_energy"]["num_bins"]
) # noqa: W605
updater["binning"]["true_allsky_fine.true_coszen"].value = (
"{'num_bins':%s, 'is_lin':True, 'domain':[-1,1], 'tex':r'\\cos\,\\theta_{Z,{\\rm true}}'}" # noqa: W605
% config_dict["true_coszen"]["num_bins"]
) # noqa: W605
# Update pipeline config
with config_updater(
root
+ "/resources/configuration_templates/pipeline_config_template.cfg",
"%s/pipeline_%s.cfg" % (path, config_name),
) as updater:
updater["pipeline"].add_before.comment(
"#include %s/binning_%s.cfg as binning" % (path, config_name)
)
updater["data.sqlite_loader"]["post_fix"].value = config_dict[
"post_fix"
]
updater["data.sqlite_loader"]["database"].value = config_dict[
"pipeline"
]
if "livetime" in config_dict.keys():
updater["aeff.aeff"]["param.livetime"].value = (
"%s * units.common_year" % config_dict["livetime"]
)
return "%s/pipeline_%s.cfg" % (path, config_name)
def _make_configs(
self,
outdir: str,
run_name: str,
is_retro: bool,
pipeline_path: str,
post_fix: str = "_pred",
config_dict: Optional[Dict] = None,
) -> str:
os.makedirs(outdir + "/" + run_name, exist_ok=True)
if config_dict is None:
# Run on standard settings
config_dict = {
"reco_energy": {"num_bins": 8},
"reco_coszen": {"num_bins": 8},
"pid": {"bin_edges": [0, 0.5, 1]},
"true_energy": {"num_bins": 200},
"true_coszen": {"num_bins": 200},
"livetime": 10,
}
config_dict["pipeline"] = pipeline_path
if is_retro:
config_dict["post_fix"] = "_retro"
else:
config_dict["post_fix"] = post_fix
pipeline_cfg_path = self._create_configs(
config_dict, outdir + "/" + run_name
)
return pipeline_cfg_path