Skip to content

gcages.cmip7_scenariomip#

CMIP7 ScenarioMIP components

Modules:

Name Description
gridding_emissions

Handling of gridding emissions

harmonisation

Harmonisation helpers for the CMIP7 ScenarioMIP workflow

infilling

Infilling configuration and related things for the CMIP7 ScenarioMIP workflow

post_processing

Post-processing in line the CMIP7 ScenarioMIP workflow

pre_processing

Pre-processing part of the workflow

scm_running

SCM-running configuration and related things for the updated workflow

Classes:

Name Description
CMIP7ScenarioMIPInfiller

Infiller that follows the same logic as was used in CMIP7 ScenarioMIP

CMIP7ScenarioMIPPostProcessor

CMIP7 ScenarioMIP fast-track post-processor

CMIP7ScenarioMIPPreProcessingResult

Result of pre-processing with CMIP7ScenarioMIPPreProcessor

CMIP7ScenarioMIPPreProcessor

Pre-processor for CMIP7's ScenarioMIP

CMIP7ScenarioMIPSCMRunner

Simple climate model runner

ReaggregatorBasic

Reaggregator that follows this module's logic

ReaggregatorLike

Interface that can be used for re-aggregation

Functions:

Name Description
create_cmip7_scenariomip_global_harmoniser

Create an Aneris harmoniser configured for CMIP7 ScenarioMIP global emissions.

CMIP7ScenarioMIPInfiller #

Infiller that follows the same logic as was used in CMIP7 ScenarioMIP

If you want exactly the same behaviour as in CMIP7 ScenarioMIP, initialise using from_cmip7_scenariomip_config

Methods:

Name Description
__call__

Create an a infilled df for CMIP7 ScenarioMIP's simple climate model run.

from_cmip7_scenariomip_config

Initialise from the config used in AR6

Attributes:

Name Type Description
cmip7_ghg_inversions DataFrame

Green house gasses inversion data frame.

harmonisation_year int

Year in which the data was harmonised

historical_emissions DataFrame

Historical emissions used for harmonisation

infilling_db DataFrame

Infilling leaders data base for each variable.

pre_industrial_year int

Pre-Industrial year

run_checks bool

If True, run checks on both input and output data

ur UnitRegistry | None

UnitRegistry

Source code in src/gcages/cmip7_scenariomip/infilling.py
@define
class CMIP7ScenarioMIPInfiller:
    """
    Infiller that follows the same logic as was used in CMIP7 ScenarioMIP

    If you want exactly the same behaviour as in CMIP7 ScenarioMIP,
    initialise using [`from_cmip7_scenariomip_config`][(c)]
    """

    infilling_db: pd.DataFrame
    """
    Infilling leaders data base for each variable.
    """

    cmip7_ghg_inversions: pd.DataFrame
    """
    Green house gasses inversion data frame.
    """

    historical_emissions: pd.DataFrame
    """
    Historical emissions used for harmonisation
    """
    harmonisation_year: int = 2023
    """
    Year in which the data was harmonised
    """
    pre_industrial_year: int = 1750
    """
    Pre-Industrial year
    """
    run_checks: bool = True
    """
    If `True`, run checks on both input and output data

    If you are sure about your workflow,
    you can disable the checks to speed things up
    (but we don't recommend this unless you really
    are confident about what you're doing).
    """

    ur: UnitRegistry | None = None
    """
    UnitRegistry
    """

    def __call__(self, in_emissions: pd.DataFrame) -> pd.DataFrame:
        """
        Create an a infilled df for CMIP7 ScenarioMIP's simple climate model run.

        Parameters
        ----------
        in_emissions
            Emissions to infill

        Returns
        -------
        :
            Infilled emissions DataFrame
        """
        if self.ur is None:
            try:
                import openscm_units

                self.ur = openscm_units.unit_registry
            except ImportError as exc:
                raise MissingOptionalDependencyError(
                    "openscm_units",
                    requirement="openscm_units",
                ) from exc

        try:
            import silicone.database_crunchers  # type: ignore # silicone has no type hints
        except ImportError as exc:
            raise MissingOptionalDependencyError(
                "get_silicone_based_infiller", requirement="silicone"
            ) from exc

        if self.run_checks:
            assert_index_is_multiindex(in_emissions)
            assert_data_is_all_numeric(in_emissions)
            assert_has_index_levels(
                in_emissions, ["variable", "unit", "model", "scenario"]
            )
            # Check that the infilling database and
            # scenario data are harmonised the same
            history = self.historical_emissions.reset_index(
                level=[
                    lvl
                    for lvl in ["model", "scenario"]
                    if lvl in self.historical_emissions.index.names
                ],
                drop=True,
            )
            assert_harmonised(
                in_emissions,
                history=history,
                harmonisation_time=self.harmonisation_year,
            )

        infilling_wmo = self.infilling_db[
            self.infilling_db.index.get_level_values("model").str.contains("WMO")
        ]

        infilling_silicone = self.infilling_db[
            ~self.infilling_db.index.get_level_values("model").str.contains("WMO")
            & ~self.infilling_db.index.get_level_values("model").str.contains("Velders")
        ]

        # Infill

        # TODO: split this out somehow
        ### Very low marker should use F-gas emissions in line with Kigali
        # We get these from [Velders et al., 2022](https://zenodo.org/records/6520707)

        vl_model, vl_scenario = ("REMIND-MAgPIE 3.5-4.11", "SSP1 - Very Low Emissions")

        mask = in_emissions.index.get_level_values("model").str.contains(
            vl_model
        ) & in_emissions.index.get_level_values("scenario").str.contains(vl_scenario)

        vl_marker = in_emissions[mask]
        unique_var = infilling_silicone.index.get_level_values("variable").unique()
        if not vl_marker.empty:
            lead_vl_marker = "Emissions|CO2|Fossil"
            infillers_silicone_vl_marker = {}
            for variable in [v for v in unique_var if v != lead_vl_marker]:
                infillers_silicone_vl_marker[variable] = get_silicone_based_infiller(
                    infilling_db=infilling_silicone,
                    follower_variable=variable,
                    lead_variables=[lead_vl_marker],
                    silicone_db_cruncher=silicone.database_crunchers.RMSClosest,
                )

            infilled_vl_exception = infill(
                vl_marker,
                infillers_silicone_vl_marker,
            )

        else:
            infilled_vl_exception = None

        # TODO: fix this. The infiller should only return infilled emissions,
        # not complete emissions.
        complete_vl_exception = get_complete(in_emissions, infilled_vl_exception)

        # Silicone
        lead = "Emissions|CO2|Fossil"
        infillers_silicone = {}
        for variable in [v for v in unique_var if v != lead]:
            infillers_silicone[variable] = get_silicone_based_infiller(
                infilling_db=infilling_silicone,
                follower_variable=variable,
                lead_variables=[lead],
                silicone_db_cruncher=silicone.database_crunchers.RMSClosest,
            )

        infilled_silicone = infill(
            complete_vl_exception,
            infillers_silicone,
        )
        complete_silicone = get_complete(complete_vl_exception, infilled_silicone)

        # Infill

        infillers_wmo = {}
        unique_var = infilling_wmo.index.get_level_values("variable").unique()
        for wmo_var in unique_var:
            infillers_wmo[wmo_var] = get_direct_copy_infiller(
                variable=wmo_var,
                copy_from=infilling_wmo,
            )

        infilled_wmo = infill(complete_silicone, infillers_wmo)
        complete_wmo = get_complete(complete_silicone, infilled_wmo)

        # Scale timeseries
        #
        # Surprisingly, this is the most mucking around of all.
        # The hard part here is that the scaling needs to be aware
        # of the fact that the pre-industrial value is different for each tiemseries.
        # The naming mucking around also adds to the fun of course.

        scaling_leaders = {
            "Emissions|C3F8": "Emissions|C2F6",
            "Emissions|C4F10": "Emissions|C2F6",
            "Emissions|C5F12": "Emissions|C2F6",
            "Emissions|C7F16": "Emissions|C2F6",
            "Emissions|C8F18": "Emissions|C2F6",
            "Emissions|cC4F8": "Emissions|CF4",
            "Emissions|SO2F2": "Emissions|CF4",
            "Emissions|HFC236fa": "Emissions|HFC245fa",
            "Emissions|HFC152a": "Emissions|HFC4310mee",
            "Emissions|HFC365mfc": "Emissions|HFC134a",
            "Emissions|CH2Cl2": "Emissions|HFC134a",
            "Emissions|CHCl3": "Emissions|C2F6",
            "Emissions|NF3": "Emissions|SF6",
        }

        infillers_scaling = get_pre_industrial_aware_direct_scaling_infiller(
            historical_emissions=self.historical_emissions,
            cmip7_ghg_inversions_reporting_names=self.cmip7_ghg_inversions,
            scaling_leaders=scaling_leaders,
            harmonisation_year=self.harmonisation_year,
            pre_industrial_year=self.pre_industrial_year,
        )

        infilled_scaling = infill(complete_wmo, infillers_scaling)
        infilled = get_complete(complete_wmo, infilled_scaling)
        infilled.columns.name = "year"

        if self.run_checks:
            pd.testing.assert_index_equal(infilled.columns, in_emissions.columns)

            assert_harmonised(
                infilled,
                history=history,
                harmonisation_time=self.harmonisation_year,
                rounding=5,  # level of data storage in historical data often
            )
            ## Check completeness
            assert_all_groups_are_complete(infilled, complete_index_gcages_names)

        return infilled

    @classmethod
    def from_cmip7_scenariomip_config(
        cls,
        cmip7_scenariomip_infilling_leader_emissions_file: Path,
        cmip7_ghg_inversions_file: Path,
        cmip7_scenariomip_global_historical_emissions_file: Path,
        ur: UnitRegistry | None = None,
        run_checks: bool = True,
    ) -> CMIP7ScenarioMIPInfiller:
        """
        Initialise from the config used in AR6

        Parameters
        ----------
        cmip7_scenariomip_infilling_leader_emissions_file
            File containing the infilling leaders database

            This is for all emissions except GHGs.

        cmip7_ghg_inversions_file
            File containing the infilling database for GHGs inversions

        cmip7_scenariomip_global_historical_emissions_file
            File containing the historical emissions used for harmonisation

        run_checks
            Should checks of the input and output data be performed?

            If this is turned off, things are faster,
            but error messages are much less clear if things go wrong.

        Returns
        -------
        :
            Initialised CMIP7ScenarioMIPInfiller
        """
        # Hardcode as we are matching CMIP7 ScenarioMIP exactly.
        # Users can copy and modify themselves if they wish
        # (or we can introduce a lower layer if lots of users want it)
        PI_YEAR = 1750
        HARMONISATION_YEAR = 2023

        if ur is None:
            try:
                import openscm_units

                ur = openscm_units.unit_registry
            except ImportError as exc:
                raise MissingOptionalDependencyError(
                    "openscm_units",
                    requirement="openscm_units",
                ) from exc

        # Still embargoed
        infilling_db = load_cmip7_scenariomip_infilling_db(
            filepath=cmip7_scenariomip_infilling_leader_emissions_file,
            check_hash=False,  # TODO: update when available
        )

        # CMIP7 GHG inversions
        cmip7_ghg_inversions = load_cmip7_scenariomip_ghg_inversions(
            filepath=cmip7_ghg_inversions_file,
        )
        # History
        historical_emissions = load_cmip7_scenariomip_historical_emissions(
            filepath=cmip7_scenariomip_global_historical_emissions_file,
            check_hash=True,
        )

        # Use gcages naming convention.
        infilling_db = update_index_levels_func(
            infilling_db,
            {
                "variable": lambda x: convert_variable_name(
                    x,
                    from_convention=SupportedNamingConventions.CMIP7_SCENARIOMIP,
                    to_convention=SupportedNamingConventions.GCAGES,
                )
            },
            copy=False,
        )
        cmip7_ghg_inversions = update_index_levels_func(
            cmip7_ghg_inversions,
            {
                "variable": lambda x: convert_variable_name(
                    x,
                    from_convention=SupportedNamingConventions.OPENSCM_RUNNER,
                    to_convention=SupportedNamingConventions.GCAGES,
                )
            },
            copy=False,
        )
        historical_emissions = update_index_levels_func(
            historical_emissions,
            {
                "variable": lambda x: convert_variable_name(
                    x,
                    from_convention=SupportedNamingConventions.CMIP7_SCENARIOMIP,
                    to_convention=SupportedNamingConventions.GCAGES,
                )
            },
            copy=False,
        )

        if run_checks:
            assert_harmonised(
                infilling_db,
                history=historical_emissions.reset_index(
                    level=[
                        lvl
                        for lvl in ["model", "scenario"]
                        if lvl in historical_emissions.index.names
                    ],
                    drop=True,
                ),
                harmonisation_time=HARMONISATION_YEAR,
                history_unit_level="unit",
                ur=ur,
            )

        return cls(
            infilling_db=infilling_db,
            historical_emissions=historical_emissions,
            cmip7_ghg_inversions=cmip7_ghg_inversions,
            harmonisation_year=HARMONISATION_YEAR,
            pre_industrial_year=PI_YEAR,
            run_checks=run_checks,
            ur=ur,
        )

cmip7_ghg_inversions instance-attribute #

cmip7_ghg_inversions: DataFrame

Green house gasses inversion data frame.

harmonisation_year class-attribute instance-attribute #

harmonisation_year: int = 2023

Year in which the data was harmonised

historical_emissions instance-attribute #

historical_emissions: DataFrame

Historical emissions used for harmonisation

infilling_db instance-attribute #

infilling_db: DataFrame

Infilling leaders data base for each variable.

pre_industrial_year class-attribute instance-attribute #

pre_industrial_year: int = 1750

Pre-Industrial year

run_checks class-attribute instance-attribute #

run_checks: bool = True

If True, run checks on both input and output data

If you are sure about your workflow, you can disable the checks to speed things up (but we don't recommend this unless you really are confident about what you're doing).

ur class-attribute instance-attribute #

ur: UnitRegistry | None = None

UnitRegistry

__call__ #

__call__(in_emissions: DataFrame) -> DataFrame

Create an a infilled df for CMIP7 ScenarioMIP's simple climate model run.

Parameters:

Name Type Description Default
in_emissions DataFrame

Emissions to infill

required

Returns:

Type Description
DataFrame

Infilled emissions DataFrame

Source code in src/gcages/cmip7_scenariomip/infilling.py
def __call__(self, in_emissions: pd.DataFrame) -> pd.DataFrame:
    """
    Create an a infilled df for CMIP7 ScenarioMIP's simple climate model run.

    Parameters
    ----------
    in_emissions
        Emissions to infill

    Returns
    -------
    :
        Infilled emissions DataFrame
    """
    if self.ur is None:
        try:
            import openscm_units

            self.ur = openscm_units.unit_registry
        except ImportError as exc:
            raise MissingOptionalDependencyError(
                "openscm_units",
                requirement="openscm_units",
            ) from exc

    try:
        import silicone.database_crunchers  # type: ignore # silicone has no type hints
    except ImportError as exc:
        raise MissingOptionalDependencyError(
            "get_silicone_based_infiller", requirement="silicone"
        ) from exc

    if self.run_checks:
        assert_index_is_multiindex(in_emissions)
        assert_data_is_all_numeric(in_emissions)
        assert_has_index_levels(
            in_emissions, ["variable", "unit", "model", "scenario"]
        )
        # Check that the infilling database and
        # scenario data are harmonised the same
        history = self.historical_emissions.reset_index(
            level=[
                lvl
                for lvl in ["model", "scenario"]
                if lvl in self.historical_emissions.index.names
            ],
            drop=True,
        )
        assert_harmonised(
            in_emissions,
            history=history,
            harmonisation_time=self.harmonisation_year,
        )

    infilling_wmo = self.infilling_db[
        self.infilling_db.index.get_level_values("model").str.contains("WMO")
    ]

    infilling_silicone = self.infilling_db[
        ~self.infilling_db.index.get_level_values("model").str.contains("WMO")
        & ~self.infilling_db.index.get_level_values("model").str.contains("Velders")
    ]

    # Infill

    # TODO: split this out somehow
    ### Very low marker should use F-gas emissions in line with Kigali
    # We get these from [Velders et al., 2022](https://zenodo.org/records/6520707)

    vl_model, vl_scenario = ("REMIND-MAgPIE 3.5-4.11", "SSP1 - Very Low Emissions")

    mask = in_emissions.index.get_level_values("model").str.contains(
        vl_model
    ) & in_emissions.index.get_level_values("scenario").str.contains(vl_scenario)

    vl_marker = in_emissions[mask]
    unique_var = infilling_silicone.index.get_level_values("variable").unique()
    if not vl_marker.empty:
        lead_vl_marker = "Emissions|CO2|Fossil"
        infillers_silicone_vl_marker = {}
        for variable in [v for v in unique_var if v != lead_vl_marker]:
            infillers_silicone_vl_marker[variable] = get_silicone_based_infiller(
                infilling_db=infilling_silicone,
                follower_variable=variable,
                lead_variables=[lead_vl_marker],
                silicone_db_cruncher=silicone.database_crunchers.RMSClosest,
            )

        infilled_vl_exception = infill(
            vl_marker,
            infillers_silicone_vl_marker,
        )

    else:
        infilled_vl_exception = None

    # TODO: fix this. The infiller should only return infilled emissions,
    # not complete emissions.
    complete_vl_exception = get_complete(in_emissions, infilled_vl_exception)

    # Silicone
    lead = "Emissions|CO2|Fossil"
    infillers_silicone = {}
    for variable in [v for v in unique_var if v != lead]:
        infillers_silicone[variable] = get_silicone_based_infiller(
            infilling_db=infilling_silicone,
            follower_variable=variable,
            lead_variables=[lead],
            silicone_db_cruncher=silicone.database_crunchers.RMSClosest,
        )

    infilled_silicone = infill(
        complete_vl_exception,
        infillers_silicone,
    )
    complete_silicone = get_complete(complete_vl_exception, infilled_silicone)

    # Infill

    infillers_wmo = {}
    unique_var = infilling_wmo.index.get_level_values("variable").unique()
    for wmo_var in unique_var:
        infillers_wmo[wmo_var] = get_direct_copy_infiller(
            variable=wmo_var,
            copy_from=infilling_wmo,
        )

    infilled_wmo = infill(complete_silicone, infillers_wmo)
    complete_wmo = get_complete(complete_silicone, infilled_wmo)

    # Scale timeseries
    #
    # Surprisingly, this is the most mucking around of all.
    # The hard part here is that the scaling needs to be aware
    # of the fact that the pre-industrial value is different for each tiemseries.
    # The naming mucking around also adds to the fun of course.

    scaling_leaders = {
        "Emissions|C3F8": "Emissions|C2F6",
        "Emissions|C4F10": "Emissions|C2F6",
        "Emissions|C5F12": "Emissions|C2F6",
        "Emissions|C7F16": "Emissions|C2F6",
        "Emissions|C8F18": "Emissions|C2F6",
        "Emissions|cC4F8": "Emissions|CF4",
        "Emissions|SO2F2": "Emissions|CF4",
        "Emissions|HFC236fa": "Emissions|HFC245fa",
        "Emissions|HFC152a": "Emissions|HFC4310mee",
        "Emissions|HFC365mfc": "Emissions|HFC134a",
        "Emissions|CH2Cl2": "Emissions|HFC134a",
        "Emissions|CHCl3": "Emissions|C2F6",
        "Emissions|NF3": "Emissions|SF6",
    }

    infillers_scaling = get_pre_industrial_aware_direct_scaling_infiller(
        historical_emissions=self.historical_emissions,
        cmip7_ghg_inversions_reporting_names=self.cmip7_ghg_inversions,
        scaling_leaders=scaling_leaders,
        harmonisation_year=self.harmonisation_year,
        pre_industrial_year=self.pre_industrial_year,
    )

    infilled_scaling = infill(complete_wmo, infillers_scaling)
    infilled = get_complete(complete_wmo, infilled_scaling)
    infilled.columns.name = "year"

    if self.run_checks:
        pd.testing.assert_index_equal(infilled.columns, in_emissions.columns)

        assert_harmonised(
            infilled,
            history=history,
            harmonisation_time=self.harmonisation_year,
            rounding=5,  # level of data storage in historical data often
        )
        ## Check completeness
        assert_all_groups_are_complete(infilled, complete_index_gcages_names)

    return infilled

from_cmip7_scenariomip_config classmethod #

from_cmip7_scenariomip_config(
    cmip7_scenariomip_infilling_leader_emissions_file: Path,
    cmip7_ghg_inversions_file: Path,
    cmip7_scenariomip_global_historical_emissions_file: Path,
    ur: UnitRegistry | None = None,
    run_checks: bool = True,
) -> CMIP7ScenarioMIPInfiller

Initialise from the config used in AR6

Parameters:

Name Type Description Default
cmip7_scenariomip_infilling_leader_emissions_file Path

File containing the infilling leaders database

This is for all emissions except GHGs.

required
cmip7_ghg_inversions_file Path

File containing the infilling database for GHGs inversions

required
cmip7_scenariomip_global_historical_emissions_file Path

File containing the historical emissions used for harmonisation

required
run_checks bool

Should checks of the input and output data be performed?

If this is turned off, things are faster, but error messages are much less clear if things go wrong.

True

Returns:

Type Description
CMIP7ScenarioMIPInfiller

Initialised CMIP7ScenarioMIPInfiller

Source code in src/gcages/cmip7_scenariomip/infilling.py
@classmethod
def from_cmip7_scenariomip_config(
    cls,
    cmip7_scenariomip_infilling_leader_emissions_file: Path,
    cmip7_ghg_inversions_file: Path,
    cmip7_scenariomip_global_historical_emissions_file: Path,
    ur: UnitRegistry | None = None,
    run_checks: bool = True,
) -> CMIP7ScenarioMIPInfiller:
    """
    Initialise from the config used in AR6

    Parameters
    ----------
    cmip7_scenariomip_infilling_leader_emissions_file
        File containing the infilling leaders database

        This is for all emissions except GHGs.

    cmip7_ghg_inversions_file
        File containing the infilling database for GHGs inversions

    cmip7_scenariomip_global_historical_emissions_file
        File containing the historical emissions used for harmonisation

    run_checks
        Should checks of the input and output data be performed?

        If this is turned off, things are faster,
        but error messages are much less clear if things go wrong.

    Returns
    -------
    :
        Initialised CMIP7ScenarioMIPInfiller
    """
    # Hardcode as we are matching CMIP7 ScenarioMIP exactly.
    # Users can copy and modify themselves if they wish
    # (or we can introduce a lower layer if lots of users want it)
    PI_YEAR = 1750
    HARMONISATION_YEAR = 2023

    if ur is None:
        try:
            import openscm_units

            ur = openscm_units.unit_registry
        except ImportError as exc:
            raise MissingOptionalDependencyError(
                "openscm_units",
                requirement="openscm_units",
            ) from exc

    # Still embargoed
    infilling_db = load_cmip7_scenariomip_infilling_db(
        filepath=cmip7_scenariomip_infilling_leader_emissions_file,
        check_hash=False,  # TODO: update when available
    )

    # CMIP7 GHG inversions
    cmip7_ghg_inversions = load_cmip7_scenariomip_ghg_inversions(
        filepath=cmip7_ghg_inversions_file,
    )
    # History
    historical_emissions = load_cmip7_scenariomip_historical_emissions(
        filepath=cmip7_scenariomip_global_historical_emissions_file,
        check_hash=True,
    )

    # Use gcages naming convention.
    infilling_db = update_index_levels_func(
        infilling_db,
        {
            "variable": lambda x: convert_variable_name(
                x,
                from_convention=SupportedNamingConventions.CMIP7_SCENARIOMIP,
                to_convention=SupportedNamingConventions.GCAGES,
            )
        },
        copy=False,
    )
    cmip7_ghg_inversions = update_index_levels_func(
        cmip7_ghg_inversions,
        {
            "variable": lambda x: convert_variable_name(
                x,
                from_convention=SupportedNamingConventions.OPENSCM_RUNNER,
                to_convention=SupportedNamingConventions.GCAGES,
            )
        },
        copy=False,
    )
    historical_emissions = update_index_levels_func(
        historical_emissions,
        {
            "variable": lambda x: convert_variable_name(
                x,
                from_convention=SupportedNamingConventions.CMIP7_SCENARIOMIP,
                to_convention=SupportedNamingConventions.GCAGES,
            )
        },
        copy=False,
    )

    if run_checks:
        assert_harmonised(
            infilling_db,
            history=historical_emissions.reset_index(
                level=[
                    lvl
                    for lvl in ["model", "scenario"]
                    if lvl in historical_emissions.index.names
                ],
                drop=True,
            ),
            harmonisation_time=HARMONISATION_YEAR,
            history_unit_level="unit",
            ur=ur,
        )

    return cls(
        infilling_db=infilling_db,
        historical_emissions=historical_emissions,
        cmip7_ghg_inversions=cmip7_ghg_inversions,
        harmonisation_year=HARMONISATION_YEAR,
        pre_industrial_year=PI_YEAR,
        run_checks=run_checks,
        ur=ur,
    )

CMIP7ScenarioMIPPostProcessor #

CMIP7 ScenarioMIP fast-track post-processor

Methods:

Name Description
__call__

Do the post-processing

from_cmip7_scenariomip_config

Initialise from the config used in CMIP7 ScenarioMIP

Attributes:

Name Type Description
exceedance_global_warming_levels tuple[float, ...]

Global-warming levels against which to calculate exceedance probabilities

gsat_assessment_median float

Median of the GSAT assessment

gsat_assessment_pre_industrial_period tuple[int, ...]

Pre-industrial time period used for the GSAT assessment

gsat_assessment_time_period tuple[int, ...]

Time period over which the GSAT assessment applies

gsat_in_line_with_assessment_variable_name str

The name of the GSAT variable once its been aligned with the assessment

gsat_variable_name str

The name of the GSAT variable

n_processes int

Number of processes to use for parallel processing.

percentiles_to_calculate tuple[float, ...]

Percentiles to calculate and include in the output

run_checks bool

If True, run checks on both input and output data

Source code in src/gcages/cmip7_scenariomip/post_processing.py
@define
class CMIP7ScenarioMIPPostProcessor:
    """
    CMIP7 ScenarioMIP fast-track post-processor
    """

    gsat_variable_name: str
    """The name of the GSAT variable"""

    gsat_in_line_with_assessment_variable_name: str
    """The name of the GSAT variable once its been aligned with the assessment"""

    gsat_assessment_median: float
    """
    Median of the GSAT assessment
    """

    gsat_assessment_time_period: tuple[int, ...]
    """
    Time period over which the GSAT assessment applies
    """

    gsat_assessment_pre_industrial_period: tuple[int, ...]
    """
    Pre-industrial time period used for the GSAT assessment
    """

    percentiles_to_calculate: tuple[float, ...] = (0.05, 0.33, 0.5, 0.67, 0.95)
    """Percentiles to calculate and include in the output"""

    exceedance_global_warming_levels: tuple[float, ...] = (1.5, 2.0, 2.5)
    """
    Global-warming levels against which to calculate exceedance probabilities
    """

    run_checks: bool = True
    """
    If `True`, run checks on both input and output data

    If you are sure about your workflow,
    you can disable the checks to speed things up
    (but we don't recommend this unless you really
    are confident about what you're doing).
    """

    n_processes: int = multiprocessing.cpu_count()
    """
    Number of processes to use for parallel processing.

    Set to 1 to process in serial.
    """

    def __call__(self, in_df: pd.DataFrame) -> PostProcessingResult:
        """
        Do the post-processing

        Parameters
        ----------
        in_df
            Data to post-process

        Returns
        -------
        :
            Post-processed results
        """
        if self.run_checks:
            self._check_in_df(in_df)

        temperatures_in_line_with_assessment = update_index_levels_func(
            get_temperatures_in_line_with_assessment(
                in_df.loc[
                    in_df.index.get_level_values("variable") == self.gsat_variable_name
                ],
                assessment_median=self.gsat_assessment_median,
                assessment_time_period=self.gsat_assessment_time_period,
                assessment_pre_industrial_period=self.gsat_assessment_pre_industrial_period,
                group_cols=["climate_model", "model", "scenario"],
            ),
            {"variable": lambda x: self.gsat_in_line_with_assessment_variable_name},
        )

        # Quantiles
        temperatures_in_line_with_assessment_quantiles = (
            fix_index_name_after_groupby_quantile(
                groupby_except(
                    temperatures_in_line_with_assessment,
                    "run_id",
                ).quantile(list(self.percentiles_to_calculate)),  # type: ignore # pandas-stubs confused
                new_name="quantile",
            )
        )

        # Exceedance probabilities, peak warming and categorisation
        exceedance_probabilities_over_time = get_exceedance_probabilities_over_time(
            temperatures_in_line_with_assessment,
            exceedance_thresholds_of_interest=self.exceedance_global_warming_levels,
            group_cols=["model", "scenario", "climate_model"],
            unit_col="unit",
            groupby_except_levels="run_id",
        )
        exceedance_probabilities = get_exceedance_probabilities(
            temperatures_in_line_with_assessment,
            exceedance_thresholds_of_interest=self.exceedance_global_warming_levels,
            group_cols=["model", "scenario", "climate_model"],
            unit_col="unit",
            groupby_except_levels="run_id",
        )

        # Peak Warming
        peak_warming_df = set_index_levels_func(
            temperatures_in_line_with_assessment.max(axis="columns").to_frame("value"),
            {"metric": "max"},
        )
        peak_warming_quantiles_df = fix_index_name_after_groupby_quantile(
            groupby_except(peak_warming_df, "run_id").quantile(
                np.array(self.percentiles_to_calculate)
            ),
            new_name="quantile",
        )
        # Extract Series for categorization and final result
        peak_warming_quantiles = peak_warming_quantiles_df["value"]

        # EOC Warming
        eoc_warming_df = set_index_levels_func(
            temperatures_in_line_with_assessment[2100].to_frame("value"),
            {"metric": 2100},
        )
        eoc_warming_quantiles_df = fix_index_name_after_groupby_quantile(
            groupby_except(eoc_warming_df, "run_id").quantile(
                np.array(self.percentiles_to_calculate)
            ),
            new_name="quantile",
        )
        eoc_warming_quantiles = eoc_warming_quantiles_df["value"]

        # Peak Year
        peak_warming_year_df = set_index_levels_func(
            update_index_levels_func(
                temperatures_in_line_with_assessment.idxmax(axis="columns").to_frame(
                    "value"
                ),
                {"unit": lambda x: "yr"},
            ),
            {"metric": "max_year"},
        )
        peak_warming_year_quantiles_df = fix_index_name_after_groupby_quantile(
            groupby_except(peak_warming_year_df, "run_id").quantile(
                np.array(self.percentiles_to_calculate)
            ),
            new_name="quantile",
        )
        peak_warming_year_quantiles = peak_warming_year_quantiles_df["value"]

        # Categorisation
        categories = categorise_scenarios(
            peak_warming_quantiles=peak_warming_quantiles,
            eoc_warming_quantiles=eoc_warming_quantiles,
            group_levels=["climate_model", "model", "scenario"],
            quantile_level="quantile",
        )

        # Metadata Compilation
        metadata_run_id = pd.concat(
            [
                peak_warming_df["value"],
                eoc_warming_df["value"],
                peak_warming_year_df["value"],
            ]
        )
        metadata_quantile = pd.concat(
            [peak_warming_quantiles, eoc_warming_quantiles, peak_warming_year_quantiles]
        )

        # Compile climate output result
        timeseries_run_id = pd.concat([temperatures_in_line_with_assessment])
        timeseries_quantile = pd.concat(
            [temperatures_in_line_with_assessment_quantiles]
        )
        timeseries_exceedance_probabilities = pd.concat(
            [exceedance_probabilities_over_time]
        )

        metadata_exceedance_probabilities = exceedance_probabilities
        metadata_categories = categories

        res = PostProcessingResult(
            timeseries_run_id=timeseries_run_id,
            timeseries_quantile=timeseries_quantile,
            timeseries_exceedance_probabilities=timeseries_exceedance_probabilities,
            metadata_run_id=metadata_run_id,
            metadata_quantile=metadata_quantile,
            metadata_exceedance_probabilities=metadata_exceedance_probabilities,
            metadata_categories=metadata_categories,
        )

        return res

    @classmethod
    def from_cmip7_scenariomip_config(cls) -> CMIP7ScenarioMIPPostProcessor:
        """
        Initialise from the config used in CMIP7 ScenarioMIP

        Returns
        -------
        :
            Initialised post-processor
        """
        return cls(
            gsat_variable_name="Surface Air Temperature Change",
            gsat_in_line_with_assessment_variable_name="Surface Temperature (GSAT)",
            gsat_assessment_median=0.85,
            gsat_assessment_time_period=tuple(range(1995, 2014 + 1)),
            gsat_assessment_pre_industrial_period=tuple(range(1850, 1900 + 1)),
            percentiles_to_calculate=(
                0.05,
                0.10,
                1.0 / 6.0,
                0.33,
                0.5,
                0.67,
                5.0 / 6.0,
                0.90,
                0.95,
            ),
            exceedance_global_warming_levels=(1.0, 4.01, 0.5),
            run_checks=True,
        )

    def _check_in_df(self, in_df: pd.DataFrame) -> None:
        """
        Perform checks on the input DataFrame
        """
        # Check for known variable names
        # Ensure that the variable we expect to process is actually present
        available_vars = in_df.index.get_level_values("variable").unique()
        if self.gsat_variable_name not in available_vars:
            msg_tuple = (
                f"Required variable '{self.gsat_variable_name}' not found in input. "
                f"Available variables: {available_vars.tolist()}"
            )
            raise ValueError(msg_tuple)

        # Check for usable time axis
        # Ensure columns are integers (years) and not empty
        if in_df.columns.empty:
            msg = "Input DataFrame has no time columns."
            raise ValueError(msg)

        try:
            # Check if all columns can be treated as integers
            years = in_df.columns.astype(int)
        except (ValueError, TypeError):
            msg_tuple = (
                f"Input columns must be integer years. Found: {in_df.columns.tolist()}"
            )
            raise ValueError(msg_tuple)

        # Ensure the time axis covers the required assessment periods
        required_years = set(self.gsat_assessment_time_period) | set(
            self.gsat_assessment_pre_industrial_period
        )
        missing_years = required_years - set(years)
        if missing_years:
            msg_years = (
                "Input data is missing years required for assessment: "
                f"{sorted(list(missing_years))}"
            )
            raise ValueError(msg_years)

        # Check if metadata is appropriate/usable
        # Check for required index levels that are used in grouping/processing
        required_levels = ["model", "scenario", "climate_model", "run_id", "unit"]
        missing_levels = [
            level for level in required_levels if level not in in_df.index.names
        ]
        if missing_levels:
            msg_l = f"Input index is missing required metadata levels: {missing_levels}"
            raise ValueError(msg_l)

        # Ensure there are no NaNs in the essential grouping metadata
        for level in ["model", "scenario", "run_id"]:
            if pd.isna(in_df.index.get_level_values(level)).any():
                msg_level = f"Found NaN values in required metadata level: '{level}'"
                raise ValueError(msg_level)

exceedance_global_warming_levels class-attribute instance-attribute #

exceedance_global_warming_levels: tuple[float, ...] = (
    1.5,
    2.0,
    2.5,
)

Global-warming levels against which to calculate exceedance probabilities

gsat_assessment_median instance-attribute #

gsat_assessment_median: float

Median of the GSAT assessment

gsat_assessment_pre_industrial_period instance-attribute #

gsat_assessment_pre_industrial_period: tuple[int, ...]

Pre-industrial time period used for the GSAT assessment

gsat_assessment_time_period instance-attribute #

gsat_assessment_time_period: tuple[int, ...]

Time period over which the GSAT assessment applies

gsat_in_line_with_assessment_variable_name instance-attribute #

gsat_in_line_with_assessment_variable_name: str

The name of the GSAT variable once its been aligned with the assessment

gsat_variable_name instance-attribute #

gsat_variable_name: str

The name of the GSAT variable

n_processes class-attribute instance-attribute #

n_processes: int = cpu_count()

Number of processes to use for parallel processing.

Set to 1 to process in serial.

percentiles_to_calculate class-attribute instance-attribute #

percentiles_to_calculate: tuple[float, ...] = (
    0.05,
    0.33,
    0.5,
    0.67,
    0.95,
)

Percentiles to calculate and include in the output

run_checks class-attribute instance-attribute #

run_checks: bool = True

If True, run checks on both input and output data

If you are sure about your workflow, you can disable the checks to speed things up (but we don't recommend this unless you really are confident about what you're doing).

__call__ #

__call__(in_df: DataFrame) -> PostProcessingResult

Do the post-processing

Parameters:

Name Type Description Default
in_df DataFrame

Data to post-process

required

Returns:

Type Description
PostProcessingResult

Post-processed results

Source code in src/gcages/cmip7_scenariomip/post_processing.py
def __call__(self, in_df: pd.DataFrame) -> PostProcessingResult:
    """
    Do the post-processing

    Parameters
    ----------
    in_df
        Data to post-process

    Returns
    -------
    :
        Post-processed results
    """
    if self.run_checks:
        self._check_in_df(in_df)

    temperatures_in_line_with_assessment = update_index_levels_func(
        get_temperatures_in_line_with_assessment(
            in_df.loc[
                in_df.index.get_level_values("variable") == self.gsat_variable_name
            ],
            assessment_median=self.gsat_assessment_median,
            assessment_time_period=self.gsat_assessment_time_period,
            assessment_pre_industrial_period=self.gsat_assessment_pre_industrial_period,
            group_cols=["climate_model", "model", "scenario"],
        ),
        {"variable": lambda x: self.gsat_in_line_with_assessment_variable_name},
    )

    # Quantiles
    temperatures_in_line_with_assessment_quantiles = (
        fix_index_name_after_groupby_quantile(
            groupby_except(
                temperatures_in_line_with_assessment,
                "run_id",
            ).quantile(list(self.percentiles_to_calculate)),  # type: ignore # pandas-stubs confused
            new_name="quantile",
        )
    )

    # Exceedance probabilities, peak warming and categorisation
    exceedance_probabilities_over_time = get_exceedance_probabilities_over_time(
        temperatures_in_line_with_assessment,
        exceedance_thresholds_of_interest=self.exceedance_global_warming_levels,
        group_cols=["model", "scenario", "climate_model"],
        unit_col="unit",
        groupby_except_levels="run_id",
    )
    exceedance_probabilities = get_exceedance_probabilities(
        temperatures_in_line_with_assessment,
        exceedance_thresholds_of_interest=self.exceedance_global_warming_levels,
        group_cols=["model", "scenario", "climate_model"],
        unit_col="unit",
        groupby_except_levels="run_id",
    )

    # Peak Warming
    peak_warming_df = set_index_levels_func(
        temperatures_in_line_with_assessment.max(axis="columns").to_frame("value"),
        {"metric": "max"},
    )
    peak_warming_quantiles_df = fix_index_name_after_groupby_quantile(
        groupby_except(peak_warming_df, "run_id").quantile(
            np.array(self.percentiles_to_calculate)
        ),
        new_name="quantile",
    )
    # Extract Series for categorization and final result
    peak_warming_quantiles = peak_warming_quantiles_df["value"]

    # EOC Warming
    eoc_warming_df = set_index_levels_func(
        temperatures_in_line_with_assessment[2100].to_frame("value"),
        {"metric": 2100},
    )
    eoc_warming_quantiles_df = fix_index_name_after_groupby_quantile(
        groupby_except(eoc_warming_df, "run_id").quantile(
            np.array(self.percentiles_to_calculate)
        ),
        new_name="quantile",
    )
    eoc_warming_quantiles = eoc_warming_quantiles_df["value"]

    # Peak Year
    peak_warming_year_df = set_index_levels_func(
        update_index_levels_func(
            temperatures_in_line_with_assessment.idxmax(axis="columns").to_frame(
                "value"
            ),
            {"unit": lambda x: "yr"},
        ),
        {"metric": "max_year"},
    )
    peak_warming_year_quantiles_df = fix_index_name_after_groupby_quantile(
        groupby_except(peak_warming_year_df, "run_id").quantile(
            np.array(self.percentiles_to_calculate)
        ),
        new_name="quantile",
    )
    peak_warming_year_quantiles = peak_warming_year_quantiles_df["value"]

    # Categorisation
    categories = categorise_scenarios(
        peak_warming_quantiles=peak_warming_quantiles,
        eoc_warming_quantiles=eoc_warming_quantiles,
        group_levels=["climate_model", "model", "scenario"],
        quantile_level="quantile",
    )

    # Metadata Compilation
    metadata_run_id = pd.concat(
        [
            peak_warming_df["value"],
            eoc_warming_df["value"],
            peak_warming_year_df["value"],
        ]
    )
    metadata_quantile = pd.concat(
        [peak_warming_quantiles, eoc_warming_quantiles, peak_warming_year_quantiles]
    )

    # Compile climate output result
    timeseries_run_id = pd.concat([temperatures_in_line_with_assessment])
    timeseries_quantile = pd.concat(
        [temperatures_in_line_with_assessment_quantiles]
    )
    timeseries_exceedance_probabilities = pd.concat(
        [exceedance_probabilities_over_time]
    )

    metadata_exceedance_probabilities = exceedance_probabilities
    metadata_categories = categories

    res = PostProcessingResult(
        timeseries_run_id=timeseries_run_id,
        timeseries_quantile=timeseries_quantile,
        timeseries_exceedance_probabilities=timeseries_exceedance_probabilities,
        metadata_run_id=metadata_run_id,
        metadata_quantile=metadata_quantile,
        metadata_exceedance_probabilities=metadata_exceedance_probabilities,
        metadata_categories=metadata_categories,
    )

    return res

from_cmip7_scenariomip_config classmethod #

from_cmip7_scenariomip_config() -> (
    CMIP7ScenarioMIPPostProcessor
)

Initialise from the config used in CMIP7 ScenarioMIP

Returns:

Type Description
CMIP7ScenarioMIPPostProcessor

Initialised post-processor

Source code in src/gcages/cmip7_scenariomip/post_processing.py
@classmethod
def from_cmip7_scenariomip_config(cls) -> CMIP7ScenarioMIPPostProcessor:
    """
    Initialise from the config used in CMIP7 ScenarioMIP

    Returns
    -------
    :
        Initialised post-processor
    """
    return cls(
        gsat_variable_name="Surface Air Temperature Change",
        gsat_in_line_with_assessment_variable_name="Surface Temperature (GSAT)",
        gsat_assessment_median=0.85,
        gsat_assessment_time_period=tuple(range(1995, 2014 + 1)),
        gsat_assessment_pre_industrial_period=tuple(range(1850, 1900 + 1)),
        percentiles_to_calculate=(
            0.05,
            0.10,
            1.0 / 6.0,
            0.33,
            0.5,
            0.67,
            5.0 / 6.0,
            0.90,
            0.95,
        ),
        exceedance_global_warming_levels=(1.0, 4.01, 0.5),
        run_checks=True,
    )

CMIP7ScenarioMIPPreProcessingResult #

Result of pre-processing with CMIP7ScenarioMIPPreProcessor

This has more components than normal, because we need to support both the 'normal' global path and harmonising at the region-sector level.

Attributes:

Name Type Description
assumed_zero_emissions DataFrame | None

Emissions that were asssumed to be zero during the processing

global_workflow_emissions DataFrame

Emissions that can be used with the 'normal' global workflow

global_workflow_emissions_raw_names DataFrame

Emissions consistent with those that can be used with the 'normal' global workflow

gridding_workflow_emissions DataFrame

Emissions that can be used with the gridding workflow

Source code in src/gcages/cmip7_scenariomip/pre_processing/pre_processor.py
@define
class CMIP7ScenarioMIPPreProcessingResult:
    """
    Result of pre-processing with [CMIP7ScenarioMIPPreProcessor][(m).]

    This has more components than normal,
    because we need to support both the 'normal' global path
    and harmonising at the region-sector level.
    """

    assumed_zero_emissions: pd.DataFrame | None
    """
    Emissions that were asssumed to be zero during the processing
    """

    gridding_workflow_emissions: pd.DataFrame
    """
    Emissions that can be used with the gridding workflow
    """

    global_workflow_emissions: pd.DataFrame
    """
    Emissions that can be used with the 'normal' global workflow
    """

    global_workflow_emissions_raw_names: pd.DataFrame
    """
    Emissions consistent with those that can be used with the 'normal' global workflow

    The difference is that these are reported with CMIP7 ScenarioMIP naming,
    which isn't compatible with our SCM runners (for example),
    so is probably not what you want to use,
    but perhaps helpful for plotting and direct comparisons.
    """

assumed_zero_emissions instance-attribute #

assumed_zero_emissions: DataFrame | None

Emissions that were asssumed to be zero during the processing

global_workflow_emissions instance-attribute #

global_workflow_emissions: DataFrame

Emissions that can be used with the 'normal' global workflow

global_workflow_emissions_raw_names instance-attribute #

global_workflow_emissions_raw_names: DataFrame

Emissions consistent with those that can be used with the 'normal' global workflow

The difference is that these are reported with CMIP7 ScenarioMIP naming, which isn't compatible with our SCM runners (for example), so is probably not what you want to use, but perhaps helpful for plotting and direct comparisons.

gridding_workflow_emissions instance-attribute #

gridding_workflow_emissions: DataFrame

Emissions that can be used with the gridding workflow

CMIP7ScenarioMIPPreProcessor #

Pre-processor for CMIP7's ScenarioMIP

For more details of the logic, see gcages.cmip7_scenariomip.pre_processing.

Methods:

Name Description
__call__

Pre-process

Attributes:

Name Type Description
co2_biosphere_sectors tuple[str, ...]

Gridding sectors that are assumed to come from the biosphere CO2 reservoir

co2_fossil_sectors tuple[str, ...]

Gridding sectors that are assumed to come from the fossil CO2 reservoir

co2_name str

Name used for CO2 in variable names

level_separator str

The separator between levels in variable names

n_processes int | None

Number of processes to use for parallel processing.

progress bool

Should progress bars be shown?

reaggregator ReaggregatorLike | None

Re-aggregator to use when converting raw data to gridding sectors

run_checks bool

If True, run checks on both input and output data

table str

The value used for the top level of variable names

world_gridding_sectors tuple[str, ...]

Sectors that are only used for gridding at the world (i.e. regional sum) level

Source code in src/gcages/cmip7_scenariomip/pre_processing/pre_processor.py
@define
class CMIP7ScenarioMIPPreProcessor:
    """
    Pre-processor for CMIP7's ScenarioMIP

    For more details of the logic, see [gcages.cmip7_scenariomip.pre_processing][].
    """

    reaggregator: ReaggregatorLike | None = None
    """
    Re-aggregator to use when converting raw data to gridding sectors

    If not supplied, we guess the re-aggregator during processing
    """

    run_checks: bool = True
    """
    If `True`, run checks on both input and output data

    If you are sure about your workflow,
    you can disable the checks to speed things up
    (but we don't recommend this unless you really
    are confident about what you're doing).
    """

    world_gridding_sectors: tuple[str, ...] = ("Aircraft", "International Shipping")
    """
    Sectors that are only used for gridding at the world (i.e. regional sum) level
    """

    co2_fossil_sectors: tuple[str, ...] = CO2_FOSSIL_SECTORS_GRIDDING
    """
    Gridding sectors that are assumed to come from the fossil CO2 reservoir
    """

    co2_biosphere_sectors: tuple[str, ...] = CO2_BIOSPHERE_SECTORS_GRIDDING
    """
    Gridding sectors that are assumed to come from the biosphere CO2 reservoir
    """

    co2_name: str = "CO2"
    """
    Name used for CO2 in variable names
    """

    table: str = "Emissions"
    """
    The value used for the top level of variable names
    """

    level_separator: str = "|"
    """
    The separator between levels in variable names
    """

    progress: bool = True
    """
    Should progress bars be shown?
    """

    n_processes: int | None = multiprocessing.cpu_count()
    """
    Number of processes to use for parallel processing.

    Set to `None` to process in serial.
    """

    def __call__(
        self, in_emissions: pd.DataFrame
    ) -> CMIP7ScenarioMIPPreProcessingResult:
        """
        Pre-process

        Parameters
        ----------
        in_emissions
            Emissions to pre-process

        Returns
        -------
        :
            Pre-processed emissions
        """
        if self.run_checks:
            assert_index_is_multiindex(in_emissions)
            assert_data_is_all_numeric(in_emissions)

            if in_emissions.columns.name != "year":
                msg = "The input emissions' column name should be 'year'"
                raise AssertionError(msg)

        res_g = apply_op_parallel_progress(
            func_to_call=do_pre_processing,
            reaggregator=self.reaggregator,
            time_name="year",
            run_checks=self.run_checks,
            world_gridding_sectors=self.world_gridding_sectors,
            table=self.table,
            level_separator=self.level_separator,
            co2_fossil_sectors=self.co2_fossil_sectors,
            co2_biosphere_sectors=self.co2_biosphere_sectors,
            co2_name=self.co2_name,
            iterable_input=(
                gdf for _, gdf in in_emissions.groupby(["model", "scenario"])
            ),
            parallel_op_config=ParallelOpConfig.from_user_facing(
                progress=self.progress,
                max_workers=self.n_processes,
            ),
        )

        res_d = defaultdict(list)
        for res_ms in res_g:
            for k, v in asdict(res_ms).items():
                if v is not None:
                    res_d[k].append(v)

        result_initialiser = {k: pd.concat(v) for k, v in res_d.items()}
        if "assumed_zero_emissions" not in result_initialiser:
            result_initialiser["assumed_zero_emissions"] = None

        res = CMIP7ScenarioMIPPreProcessingResult(**result_initialiser)

        return res

co2_biosphere_sectors class-attribute instance-attribute #

co2_biosphere_sectors: tuple[str, ...] = (
    CO2_BIOSPHERE_SECTORS_GRIDDING
)

Gridding sectors that are assumed to come from the biosphere CO2 reservoir

co2_fossil_sectors class-attribute instance-attribute #

co2_fossil_sectors: tuple[str, ...] = (
    CO2_FOSSIL_SECTORS_GRIDDING
)

Gridding sectors that are assumed to come from the fossil CO2 reservoir

co2_name class-attribute instance-attribute #

co2_name: str = 'CO2'

Name used for CO2 in variable names

level_separator class-attribute instance-attribute #

level_separator: str = '|'

The separator between levels in variable names

n_processes class-attribute instance-attribute #

n_processes: int | None = cpu_count()

Number of processes to use for parallel processing.

Set to None to process in serial.

progress class-attribute instance-attribute #

progress: bool = True

Should progress bars be shown?

reaggregator class-attribute instance-attribute #

reaggregator: ReaggregatorLike | None = None

Re-aggregator to use when converting raw data to gridding sectors

If not supplied, we guess the re-aggregator during processing

run_checks class-attribute instance-attribute #

run_checks: bool = True

If True, run checks on both input and output data

If you are sure about your workflow, you can disable the checks to speed things up (but we don't recommend this unless you really are confident about what you're doing).

table class-attribute instance-attribute #

table: str = 'Emissions'

The value used for the top level of variable names

world_gridding_sectors class-attribute instance-attribute #

world_gridding_sectors: tuple[str, ...] = (
    "Aircraft",
    "International Shipping",
)

Sectors that are only used for gridding at the world (i.e. regional sum) level

__call__ #

__call__(
    in_emissions: DataFrame,
) -> CMIP7ScenarioMIPPreProcessingResult

Pre-process

Parameters:

Name Type Description Default
in_emissions DataFrame

Emissions to pre-process

required

Returns:

Type Description
CMIP7ScenarioMIPPreProcessingResult

Pre-processed emissions

Source code in src/gcages/cmip7_scenariomip/pre_processing/pre_processor.py
def __call__(
    self, in_emissions: pd.DataFrame
) -> CMIP7ScenarioMIPPreProcessingResult:
    """
    Pre-process

    Parameters
    ----------
    in_emissions
        Emissions to pre-process

    Returns
    -------
    :
        Pre-processed emissions
    """
    if self.run_checks:
        assert_index_is_multiindex(in_emissions)
        assert_data_is_all_numeric(in_emissions)

        if in_emissions.columns.name != "year":
            msg = "The input emissions' column name should be 'year'"
            raise AssertionError(msg)

    res_g = apply_op_parallel_progress(
        func_to_call=do_pre_processing,
        reaggregator=self.reaggregator,
        time_name="year",
        run_checks=self.run_checks,
        world_gridding_sectors=self.world_gridding_sectors,
        table=self.table,
        level_separator=self.level_separator,
        co2_fossil_sectors=self.co2_fossil_sectors,
        co2_biosphere_sectors=self.co2_biosphere_sectors,
        co2_name=self.co2_name,
        iterable_input=(
            gdf for _, gdf in in_emissions.groupby(["model", "scenario"])
        ),
        parallel_op_config=ParallelOpConfig.from_user_facing(
            progress=self.progress,
            max_workers=self.n_processes,
        ),
    )

    res_d = defaultdict(list)
    for res_ms in res_g:
        for k, v in asdict(res_ms).items():
            if v is not None:
                res_d[k].append(v)

    result_initialiser = {k: pd.concat(v) for k, v in res_d.items()}
    if "assumed_zero_emissions" not in result_initialiser:
        result_initialiser["assumed_zero_emissions"] = None

    res = CMIP7ScenarioMIPPreProcessingResult(**result_initialiser)

    return res

CMIP7ScenarioMIPSCMRunner #

Simple climate model runner

It follows the same logic as was used in CMIP7 SCENARIOMIP

If you want exactly the same behaviour as in CMIP7 SCENARIOMIP initialise using from_cmip7_scenariomip_config

Methods:

Name Description
__call__

Run the simple climate model

from_cmip7_scenariomip_config

Initialise from the config used in CMIP7 ScenarioMIP

Attributes:

Name Type Description
batch_size_scenarios int | None

The number of scenarios to run at a time

climate_models_cfgs dict[str, list[dict[str, Any]]]

Climate models to run and the configuration to use with them

db OpenSCMDB | None

Database in which to store the output of the runs

harmonisation_year int | None

Year in which the data was harmonised

historical_emissions DataFrame | None

Historical emissions used for harmonisation

n_processes int | None

Number of processes to use for parallel processing.

output_variables tuple[str, ...]

Variables to include in the output

progress bool

Should progress bars be shown for each operation?

res_column_type type

Type to cast the result's column type to

run_checks bool

If True, run checks on both input and output data

verbose bool

Should verbose messages be printed?

Source code in src/gcages/cmip7_scenariomip/scm_running.py
@define
class CMIP7ScenarioMIPSCMRunner:
    """
    Simple climate model runner

    It follows the same logic as was used in CMIP7 SCENARIOMIP

    If you want exactly the same behaviour as in CMIP7 SCENARIOMIP
    initialise using [`from_cmip7_scenariomip_config`][(c)]
    """

    climate_models_cfgs: dict[str, list[dict[str, Any]]] = field(
        repr=lambda x: ", ".join(
            (
                f"{climate_model}: {len(cfgs)} configurations"
                for climate_model, cfgs in x.items()
            )
        )
    )
    """
    Climate models to run and the configuration to use with them
    """

    output_variables: tuple[str, ...]
    """
    Variables to include in the output
    """

    batch_size_scenarios: int | None = None
    """
    The number of scenarios to run at a time

    Smaller batch sizes use less memory, but take longer overall
    (all else being equal).

    If not supplied, all scenarios are run simultaneously.
    """

    db: OpenSCMDB | None = None
    """
    Database in which to store the output of the runs

    If not supplied, output of the runs is not stored.
    """

    res_column_type: type = int
    """
    Type to cast the result's column type to
    """

    historical_emissions: pd.DataFrame | None = None
    """
    Historical emissions used for harmonisation

    Only required if `run_checks` is `True` to check
    that the data to run is harmonised.
    """

    harmonisation_year: int | None = None
    """
    Year in which the data was harmonised

    Only required if `run_checks` is `True` to check
    that the data to run is harmonised.
    """

    verbose: bool = True
    """
    Should verbose messages be printed?

    This is a temporary hack while we think about how to handle logging
    """

    run_checks: bool = True
    """
    If `True`, run checks on both input and output data

    If you are sure about your workflow,
    you can disable the checks to speed things up
    (but we don't recommend this unless you really
    are confident about what you're doing).
    """

    progress: bool = True
    """
    Should progress bars be shown for each operation?
    """

    n_processes: int | None = multiprocessing.cpu_count()
    """
    Number of processes to use for parallel processing.

    Set to `None` to process in serial.
    """

    def __call__(  # noqa: PLR0912
        self, in_emissions: pd.DataFrame, force_rerun: bool = False
    ) -> pd.DataFrame:
        """
        Run the simple climate model

        Parameters
        ----------
        in_emissions
            Emissions to run

        force_rerun
            Force scenarios to re-run (i.e. disable caching).

        Returns
        -------
        :
            Raw results from the simple climate model
        """
        if self.run_checks:
            assert_index_is_multiindex(in_emissions)
            assert_has_index_levels(
                in_emissions, ["variable", "unit", "model", "scenario"]
            )
            assert_has_no_pint_incompatible_characters(
                in_emissions.index.get_level_values("unit").unique()
            )
            assert_data_is_all_numeric(in_emissions)

            if self.historical_emissions is None:
                msg = "`self.historical_emissions` must be set to check the infilling"
                raise AssertionError(msg)

            if self.harmonisation_year is None:
                msg = "`self.harmonisation_year` must be set to check the infilling"
                raise AssertionError(msg)

            assert_has_data_for_times(
                in_emissions,
                name="in_emissions",
                times=[self.harmonisation_year, 2100],
                allow_nan=False,
            )

            assert_harmonised(
                in_emissions,
                history=self.historical_emissions,
                harmonisation_time=self.harmonisation_year,
                rounding=5,  # level of data storage in historical data often
            )
            assert_all_groups_are_complete(
                # The combo of the input and infilled should be complete
                in_emissions,
                complete_index=self.historical_emissions.index.droplevel("unit"),
            )

        if "MAGICC7" in self.climate_models_cfgs:
            if self.historical_emissions is None:
                # No history provided: assume emissions are already complete
                complete_emissions = in_emissions
                complete_emissions.columns = complete_emissions.columns.astype(int)
                # Validate MAGICC requirement
                magicc_start_year = 2015
                if int(min(complete_emissions.columns.to_numpy())) != magicc_start_year:
                    msg = "Emissions starting year must be set to `2015`"
                    raise AssertionError(msg)
            else:
                # History provided merge with scenarios
                complete_emissions = get_complete_scenarios_for_magicc(
                    scenarios=in_emissions,
                    history=self.historical_emissions,
                )
                complete_emissions.columns = complete_emissions.columns.astype(int)
        else:
            # Not running MAGICC, use emissions as-is
            complete_emissions = in_emissions

        openscm_runner_emissions = update_index_levels_func(
            complete_emissions,
            {
                "variable": partial(
                    convert_variable_name,
                    from_convention=SupportedNamingConventions.GCAGES,
                    to_convention=SupportedNamingConventions.OPENSCM_RUNNER,
                )
            },
        )

        # if self.force_interpolate_to_yearly:
        #     # TODO: put interpolate to annual steps in pandas-openscm
        #     # Interpolate to ensure no nans.
        #     for y in range(
        #         openscm_runner_emissions.columns.min(),
        #         openscm_runner_emissions.columns.max() + 1,
        #     ):
        #         if y not in openscm_runner_emissions:
        #             openscm_runner_emissions[y] = np.nan
        #
        #     openscm_runner_emissions = (
        #         openscm_runner_emissions.sort_index(axis="columns")
        #         .T.interpolate("index")
        #         .T
        #     )
        scm_results_maybe = run_scms(
            scenarios=openscm_runner_emissions,
            climate_models_cfgs=self.climate_models_cfgs,
            output_variables=self.output_variables,
            scenario_group_levels=["model", "scenario"],
            n_processes=self.n_processes if self.n_processes is not None else 1,
            db=self.db,
            verbose=self.verbose,
            batch_size_scenarios=self.batch_size_scenarios,
            force_rerun=True,
        )

        if self.db is not None:
            # Results aren't kept in memory during running, so have to load them now.
            # User can use `run_scms` directly if they want to process differently.
            out_maybe = self.db.load()
            if out_maybe is None:
                raise TypeError(out_maybe)

            out: pd.DataFrame = out_maybe

        else:
            if scm_results_maybe is None:
                raise TypeError(scm_results_maybe)

            out = scm_results_maybe

        out.columns = out.columns.astype(self.res_column_type)

        if self.run_checks:
            # All scenarios have output
            pd.testing.assert_index_equal(  # type: ignore # pandas-stubs out of date
                out.index.droplevel(
                    out.index.names.difference(["model", "scenario"])  # type: ignore # pandas-stubs out of date
                ).drop_duplicates(),
                in_emissions.index.droplevel(
                    in_emissions.index.names.difference(["model", "scenario"])  # type: ignore # pandas-stubs out of date
                ).drop_duplicates(),
                check_order=False,
            )
            # Expected output is provided
            assert_all_groups_are_complete(
                out,
                complete_index=pd.MultiIndex.from_arrays(
                    [list(self.output_variables)], names=["variable"]
                ),
            )

        return out

    @classmethod
    def from_cmip7_scenariomip_config(  # noqa: PLR0913
        cls,
        magicc_exe_path: Path,
        magicc_prob_distribution_path: Path,
        output_variables: tuple[str, ...] = SCM_OUTPUT_VARIABLES_DEFAULT,
        batch_size_scenarios: int | None = None,
        db: OpenSCMDB | None = None,
        historical_emissions_path: Path | None = None,
        harmonisation_year: int = 2023,
        verbose: bool = True,
        run_checks: bool = True,
        progress: bool = True,
        n_processes: int | None = multiprocessing.cpu_count(),
    ) -> CMIP7ScenarioMIPSCMRunner:
        """
        Initialise from the config used in CMIP7 ScenarioMIP

        Parameters
        ----------
        magicc_exe_path
            Path to the MAGICC executable to use.

            This should be a MAGICC v7.6.0a3 executable.

        magicc_prob_distribution_path
            Path to the MAGICC probabilistic distribution.

            This should be the CMIP7 ScenarioMIP probabilistic distribution.

        output_variables
            Variables to include in the output

        batch_size_scenarios
            The number of scenarios to run at a time

        db
            Database to use for storing results.

            If not supplied, raw outputs are not stored.

        historical_emissions_path
            Historical emissions used for harmonisation

            Only required if `run_checks` is `True` to check
            that the data is harmonised before running the SCMs.

        harmonisation_year
            Year in which the data was harmonised

            Only required if `run_checks` is `True` to check
            that the data is harmonised before running the SCMs.

        verbose
            Should verbose messages be printed?

            This is a temporary hack while we think about how to handle logging

        run_checks
            Should checks of the input and output data be performed?

            If this is turned off, things are faster,
            but error messages are much less clear if things go wrong.

        progress
            Should progress bars be shown for each operation?

        n_processes
            Number of processes to use for parallel processing.

            Set to `None` to process in serial.

        Returns
        -------
        :
            Initialised SCM runner
        """
        os.environ["MAGICC_EXECUTABLE_7"] = str(magicc_exe_path)
        check_cmip7_scenariomip_magicc7_version()

        if historical_emissions_path is not None:
            # Load history
            historical_emissions = load_cmip7_scenariomip_historical_emissions(
                filepath=historical_emissions_path,
                check_hash=True,
            )
            historical_emissions = update_index_levels_func(
                historical_emissions,
                {
                    "variable": lambda x: convert_variable_name(
                        x,
                        from_convention=SupportedNamingConventions.CMIP7_SCENARIOMIP,
                        to_convention=SupportedNamingConventions.GCAGES,
                    )
                },
                copy=False,
            )

            historical_emissions = historical_emissions.reset_index(
                level=[
                    lvl
                    for lvl in ["model", "scenario"]
                    if lvl in historical_emissions.index.names
                ],
                drop=True,
            )
        else:
            historical_emissions = None

        magicc_prob_cfg = load_magicc_cfgs(
            prob_distribution_path=magicc_prob_distribution_path,
            output_variables=output_variables,
            startyear=1750,
        )

        return cls(
            climate_models_cfgs=magicc_prob_cfg,
            output_variables=output_variables,
            batch_size_scenarios=batch_size_scenarios,
            db=db,
            historical_emissions=historical_emissions,
            harmonisation_year=harmonisation_year,
            verbose=verbose,
            run_checks=run_checks,
            n_processes=n_processes,
            res_column_type=int,  # annual output by default
        )

batch_size_scenarios class-attribute instance-attribute #

batch_size_scenarios: int | None = None

The number of scenarios to run at a time

Smaller batch sizes use less memory, but take longer overall (all else being equal).

If not supplied, all scenarios are run simultaneously.

climate_models_cfgs class-attribute instance-attribute #

climate_models_cfgs: dict[str, list[dict[str, Any]]] = (
    field(
        repr=lambda x: join(
            f"{climate_model}: {len(cfgs)} configurations"
            for (climate_model, cfgs) in items()
        )
    )
)

Climate models to run and the configuration to use with them

db class-attribute instance-attribute #

db: OpenSCMDB | None = None

Database in which to store the output of the runs

If not supplied, output of the runs is not stored.

harmonisation_year class-attribute instance-attribute #

harmonisation_year: int | None = None

Year in which the data was harmonised

Only required if run_checks is True to check that the data to run is harmonised.

historical_emissions class-attribute instance-attribute #

historical_emissions: DataFrame | None = None

Historical emissions used for harmonisation

Only required if run_checks is True to check that the data to run is harmonised.

n_processes class-attribute instance-attribute #

n_processes: int | None = cpu_count()

Number of processes to use for parallel processing.

Set to None to process in serial.

output_variables instance-attribute #

output_variables: tuple[str, ...]

Variables to include in the output

progress class-attribute instance-attribute #

progress: bool = True

Should progress bars be shown for each operation?

res_column_type class-attribute instance-attribute #

res_column_type: type = int

Type to cast the result's column type to

run_checks class-attribute instance-attribute #

run_checks: bool = True

If True, run checks on both input and output data

If you are sure about your workflow, you can disable the checks to speed things up (but we don't recommend this unless you really are confident about what you're doing).

verbose class-attribute instance-attribute #

verbose: bool = True

Should verbose messages be printed?

This is a temporary hack while we think about how to handle logging

__call__ #

__call__(
    in_emissions: DataFrame, force_rerun: bool = False
) -> DataFrame

Run the simple climate model

Parameters:

Name Type Description Default
in_emissions DataFrame

Emissions to run

required
force_rerun bool

Force scenarios to re-run (i.e. disable caching).

False

Returns:

Type Description
DataFrame

Raw results from the simple climate model

Source code in src/gcages/cmip7_scenariomip/scm_running.py
def __call__(  # noqa: PLR0912
    self, in_emissions: pd.DataFrame, force_rerun: bool = False
) -> pd.DataFrame:
    """
    Run the simple climate model

    Parameters
    ----------
    in_emissions
        Emissions to run

    force_rerun
        Force scenarios to re-run (i.e. disable caching).

    Returns
    -------
    :
        Raw results from the simple climate model
    """
    if self.run_checks:
        assert_index_is_multiindex(in_emissions)
        assert_has_index_levels(
            in_emissions, ["variable", "unit", "model", "scenario"]
        )
        assert_has_no_pint_incompatible_characters(
            in_emissions.index.get_level_values("unit").unique()
        )
        assert_data_is_all_numeric(in_emissions)

        if self.historical_emissions is None:
            msg = "`self.historical_emissions` must be set to check the infilling"
            raise AssertionError(msg)

        if self.harmonisation_year is None:
            msg = "`self.harmonisation_year` must be set to check the infilling"
            raise AssertionError(msg)

        assert_has_data_for_times(
            in_emissions,
            name="in_emissions",
            times=[self.harmonisation_year, 2100],
            allow_nan=False,
        )

        assert_harmonised(
            in_emissions,
            history=self.historical_emissions,
            harmonisation_time=self.harmonisation_year,
            rounding=5,  # level of data storage in historical data often
        )
        assert_all_groups_are_complete(
            # The combo of the input and infilled should be complete
            in_emissions,
            complete_index=self.historical_emissions.index.droplevel("unit"),
        )

    if "MAGICC7" in self.climate_models_cfgs:
        if self.historical_emissions is None:
            # No history provided: assume emissions are already complete
            complete_emissions = in_emissions
            complete_emissions.columns = complete_emissions.columns.astype(int)
            # Validate MAGICC requirement
            magicc_start_year = 2015
            if int(min(complete_emissions.columns.to_numpy())) != magicc_start_year:
                msg = "Emissions starting year must be set to `2015`"
                raise AssertionError(msg)
        else:
            # History provided merge with scenarios
            complete_emissions = get_complete_scenarios_for_magicc(
                scenarios=in_emissions,
                history=self.historical_emissions,
            )
            complete_emissions.columns = complete_emissions.columns.astype(int)
    else:
        # Not running MAGICC, use emissions as-is
        complete_emissions = in_emissions

    openscm_runner_emissions = update_index_levels_func(
        complete_emissions,
        {
            "variable": partial(
                convert_variable_name,
                from_convention=SupportedNamingConventions.GCAGES,
                to_convention=SupportedNamingConventions.OPENSCM_RUNNER,
            )
        },
    )

    # if self.force_interpolate_to_yearly:
    #     # TODO: put interpolate to annual steps in pandas-openscm
    #     # Interpolate to ensure no nans.
    #     for y in range(
    #         openscm_runner_emissions.columns.min(),
    #         openscm_runner_emissions.columns.max() + 1,
    #     ):
    #         if y not in openscm_runner_emissions:
    #             openscm_runner_emissions[y] = np.nan
    #
    #     openscm_runner_emissions = (
    #         openscm_runner_emissions.sort_index(axis="columns")
    #         .T.interpolate("index")
    #         .T
    #     )
    scm_results_maybe = run_scms(
        scenarios=openscm_runner_emissions,
        climate_models_cfgs=self.climate_models_cfgs,
        output_variables=self.output_variables,
        scenario_group_levels=["model", "scenario"],
        n_processes=self.n_processes if self.n_processes is not None else 1,
        db=self.db,
        verbose=self.verbose,
        batch_size_scenarios=self.batch_size_scenarios,
        force_rerun=True,
    )

    if self.db is not None:
        # Results aren't kept in memory during running, so have to load them now.
        # User can use `run_scms` directly if they want to process differently.
        out_maybe = self.db.load()
        if out_maybe is None:
            raise TypeError(out_maybe)

        out: pd.DataFrame = out_maybe

    else:
        if scm_results_maybe is None:
            raise TypeError(scm_results_maybe)

        out = scm_results_maybe

    out.columns = out.columns.astype(self.res_column_type)

    if self.run_checks:
        # All scenarios have output
        pd.testing.assert_index_equal(  # type: ignore # pandas-stubs out of date
            out.index.droplevel(
                out.index.names.difference(["model", "scenario"])  # type: ignore # pandas-stubs out of date
            ).drop_duplicates(),
            in_emissions.index.droplevel(
                in_emissions.index.names.difference(["model", "scenario"])  # type: ignore # pandas-stubs out of date
            ).drop_duplicates(),
            check_order=False,
        )
        # Expected output is provided
        assert_all_groups_are_complete(
            out,
            complete_index=pd.MultiIndex.from_arrays(
                [list(self.output_variables)], names=["variable"]
            ),
        )

    return out

from_cmip7_scenariomip_config classmethod #

from_cmip7_scenariomip_config(
    magicc_exe_path: Path,
    magicc_prob_distribution_path: Path,
    output_variables: tuple[
        str, ...
    ] = SCM_OUTPUT_VARIABLES_DEFAULT,
    batch_size_scenarios: int | None = None,
    db: OpenSCMDB | None = None,
    historical_emissions_path: Path | None = None,
    harmonisation_year: int = 2023,
    verbose: bool = True,
    run_checks: bool = True,
    progress: bool = True,
    n_processes: int | None = cpu_count(),
) -> CMIP7ScenarioMIPSCMRunner

Initialise from the config used in CMIP7 ScenarioMIP

Parameters:

Name Type Description Default
magicc_exe_path Path

Path to the MAGICC executable to use.

This should be a MAGICC v7.6.0a3 executable.

required
magicc_prob_distribution_path Path

Path to the MAGICC probabilistic distribution.

This should be the CMIP7 ScenarioMIP probabilistic distribution.

required
output_variables tuple[str, ...]

Variables to include in the output

SCM_OUTPUT_VARIABLES_DEFAULT
batch_size_scenarios int | None

The number of scenarios to run at a time

None
db OpenSCMDB | None

Database to use for storing results.

If not supplied, raw outputs are not stored.

None
historical_emissions_path Path | None

Historical emissions used for harmonisation

Only required if run_checks is True to check that the data is harmonised before running the SCMs.

None
harmonisation_year int

Year in which the data was harmonised

Only required if run_checks is True to check that the data is harmonised before running the SCMs.

2023
verbose bool

Should verbose messages be printed?

This is a temporary hack while we think about how to handle logging

True
run_checks bool

Should checks of the input and output data be performed?

If this is turned off, things are faster, but error messages are much less clear if things go wrong.

True
progress bool

Should progress bars be shown for each operation?

True
n_processes int | None

Number of processes to use for parallel processing.

Set to None to process in serial.

cpu_count()

Returns:

Type Description
CMIP7ScenarioMIPSCMRunner

Initialised SCM runner

Source code in src/gcages/cmip7_scenariomip/scm_running.py
@classmethod
def from_cmip7_scenariomip_config(  # noqa: PLR0913
    cls,
    magicc_exe_path: Path,
    magicc_prob_distribution_path: Path,
    output_variables: tuple[str, ...] = SCM_OUTPUT_VARIABLES_DEFAULT,
    batch_size_scenarios: int | None = None,
    db: OpenSCMDB | None = None,
    historical_emissions_path: Path | None = None,
    harmonisation_year: int = 2023,
    verbose: bool = True,
    run_checks: bool = True,
    progress: bool = True,
    n_processes: int | None = multiprocessing.cpu_count(),
) -> CMIP7ScenarioMIPSCMRunner:
    """
    Initialise from the config used in CMIP7 ScenarioMIP

    Parameters
    ----------
    magicc_exe_path
        Path to the MAGICC executable to use.

        This should be a MAGICC v7.6.0a3 executable.

    magicc_prob_distribution_path
        Path to the MAGICC probabilistic distribution.

        This should be the CMIP7 ScenarioMIP probabilistic distribution.

    output_variables
        Variables to include in the output

    batch_size_scenarios
        The number of scenarios to run at a time

    db
        Database to use for storing results.

        If not supplied, raw outputs are not stored.

    historical_emissions_path
        Historical emissions used for harmonisation

        Only required if `run_checks` is `True` to check
        that the data is harmonised before running the SCMs.

    harmonisation_year
        Year in which the data was harmonised

        Only required if `run_checks` is `True` to check
        that the data is harmonised before running the SCMs.

    verbose
        Should verbose messages be printed?

        This is a temporary hack while we think about how to handle logging

    run_checks
        Should checks of the input and output data be performed?

        If this is turned off, things are faster,
        but error messages are much less clear if things go wrong.

    progress
        Should progress bars be shown for each operation?

    n_processes
        Number of processes to use for parallel processing.

        Set to `None` to process in serial.

    Returns
    -------
    :
        Initialised SCM runner
    """
    os.environ["MAGICC_EXECUTABLE_7"] = str(magicc_exe_path)
    check_cmip7_scenariomip_magicc7_version()

    if historical_emissions_path is not None:
        # Load history
        historical_emissions = load_cmip7_scenariomip_historical_emissions(
            filepath=historical_emissions_path,
            check_hash=True,
        )
        historical_emissions = update_index_levels_func(
            historical_emissions,
            {
                "variable": lambda x: convert_variable_name(
                    x,
                    from_convention=SupportedNamingConventions.CMIP7_SCENARIOMIP,
                    to_convention=SupportedNamingConventions.GCAGES,
                )
            },
            copy=False,
        )

        historical_emissions = historical_emissions.reset_index(
            level=[
                lvl
                for lvl in ["model", "scenario"]
                if lvl in historical_emissions.index.names
            ],
            drop=True,
        )
    else:
        historical_emissions = None

    magicc_prob_cfg = load_magicc_cfgs(
        prob_distribution_path=magicc_prob_distribution_path,
        output_variables=output_variables,
        startyear=1750,
    )

    return cls(
        climate_models_cfgs=magicc_prob_cfg,
        output_variables=output_variables,
        batch_size_scenarios=batch_size_scenarios,
        db=db,
        historical_emissions=historical_emissions,
        harmonisation_year=harmonisation_year,
        verbose=verbose,
        run_checks=run_checks,
        n_processes=n_processes,
        res_column_type=int,  # annual output by default
    )

ReaggregatorBasic #

Reaggregator that follows this module's logic

Methods:

Name Description
assert_has_all_required_timeseries

Assert that the data has all the required timeseries

assert_is_internally_consistent

Assert that the data is internally consistent

default_tols_internal_consistency

Get default tolerances for internal consistency checks

get_internal_consistency_checking_index

Get the index which selects only data relevant for checking internal consistency

to_complete

Convert the raw data to complete data

to_gridding_sectors

Re-aggregate data to the sectors used for gridding

Attributes:

Name Type Description
internal_consistency_tolerances Mapping[str, Mapping[str, float]] | Mapping[str, Mapping[str, PINT_SCALAR]]

Tolerances to apply when checking the internal consistency of the data

model_regions tuple[str, ...]

Model regions to use while reaggregating

region_level str

Region level in the data index

unit_level str

Unit level in the data index

variable_level str

Variable level in the data index

world_region str

The value used when the data represents the sum over all regions

Source code in src/gcages/cmip7_scenariomip/pre_processing/reaggregation/basic.py
@define
class ReaggregatorBasic:
    """
    Reaggregator that follows this module's logic
    """

    model_regions: tuple[str, ...]
    """Model regions to use while reaggregating"""

    region_level: str = "region"
    """Region level in the data index"""

    unit_level: str = "unit"
    """Unit level in the data index"""

    variable_level: str = "variable"
    """Variable level in the data index"""

    world_region: str = "World"
    """
    The value used when the data represents the sum over all regions

    (Having a value for this is odd,
    there should really just be no region level when data is the sum,
    but this is the data format used so we have to follow this convention.)
    """

    internal_consistency_tolerances: (
        Mapping[str, Mapping[str, float]] | Mapping[str, Mapping[str, PINT_SCALAR]]
    ) = field()
    """
    Tolerances to apply when checking the internal consistency of the data
    """

    @internal_consistency_tolerances.default
    def default_tols_internal_consistency(
        self,
    ) -> Mapping[str, Mapping[str, float]] | Mapping[str, Mapping[str, PINT_SCALAR]]:
        """
        Get default tolerances for internal consistency checks
        """
        return get_default_internal_conistency_checking_tolerances()

    def assert_has_all_required_timeseries(self, indf: pd.DataFrame) -> None:
        """
        Assert that the data has all the required timeseries

        Parameters
        ----------
        indf
            Data to check

        Raises
        ------
        NotCompleteError
            `indf` is not complete
        """
        assert_has_all_required_timeseries(
            indf,
            model_regions=self.model_regions,
            world_region=self.world_region,
            region_level=self.region_level,
            variable_level=self.variable_level,
        )

    def assert_is_internally_consistent(self, indf: pd.DataFrame) -> None:
        """
        Assert that the data is internally consistent

        Parameters
        ----------
        indf
            Data to check

        Raises
        ------
        InternalConsistencyError
            The data is not internally consistent
        """
        assert_is_internally_consistent(
            indf,
            model_regions=self.model_regions,
            tolerances=self.internal_consistency_tolerances,
            world_region=self.world_region,
            region_level=self.region_level,
            unit_level=self.unit_level,
            variable_level=self.variable_level,
        )

    def get_internal_consistency_checking_index(self) -> pd.MultiIndex:
        """
        Get the index which selects only data relevant for checking internal consistency

        Returns
        -------
        :
            Internal consistency checking index
        """
        return get_internal_consistency_checking_index(
            model_regions=self.model_regions,
            world_region=self.world_region,
            region_level=self.region_level,
            variable_level=self.variable_level,
        )

    def to_complete(self, raw: pd.DataFrame) -> ToCompleteResult:
        """
        Convert the raw data to complete data

        Parameters
        ----------
        raw
            Raw data

        Returns
        -------
        :
            To complete result
        """
        return to_complete(
            indf=raw,
            model_regions=self.model_regions,
            unit_level=self.unit_level,
            variable_level=self.variable_level,
            region_level=self.region_level,
            world_region=self.world_region,
        )

    def to_gridding_sectors(self, indf: pd.DataFrame) -> pd.DataFrame:
        """
        Re-aggregate data to the sectors used for gridding

        Parameters
        ----------
        indf
            Data to re-aggregate

        Returns
        -------
        :
            Data re-aggregated to the gridding sectors
        """
        return to_gridding_sectors(
            indf=indf, region_level=self.region_level, world_region=self.world_region
        )

internal_consistency_tolerances class-attribute instance-attribute #

internal_consistency_tolerances: (
    Mapping[str, Mapping[str, float]]
    | Mapping[str, Mapping[str, PINT_SCALAR]]
) = field()

Tolerances to apply when checking the internal consistency of the data

model_regions instance-attribute #

model_regions: tuple[str, ...]

Model regions to use while reaggregating

region_level class-attribute instance-attribute #

region_level: str = 'region'

Region level in the data index

unit_level class-attribute instance-attribute #

unit_level: str = 'unit'

Unit level in the data index

variable_level class-attribute instance-attribute #

variable_level: str = 'variable'

Variable level in the data index

world_region class-attribute instance-attribute #

world_region: str = 'World'

The value used when the data represents the sum over all regions

(Having a value for this is odd, there should really just be no region level when data is the sum, but this is the data format used so we have to follow this convention.)

assert_has_all_required_timeseries #

assert_has_all_required_timeseries(indf: DataFrame) -> None

Assert that the data has all the required timeseries

Parameters:

Name Type Description Default
indf DataFrame

Data to check

required

Raises:

Type Description
NotCompleteError

indf is not complete

Source code in src/gcages/cmip7_scenariomip/pre_processing/reaggregation/basic.py
def assert_has_all_required_timeseries(self, indf: pd.DataFrame) -> None:
    """
    Assert that the data has all the required timeseries

    Parameters
    ----------
    indf
        Data to check

    Raises
    ------
    NotCompleteError
        `indf` is not complete
    """
    assert_has_all_required_timeseries(
        indf,
        model_regions=self.model_regions,
        world_region=self.world_region,
        region_level=self.region_level,
        variable_level=self.variable_level,
    )

assert_is_internally_consistent #

assert_is_internally_consistent(indf: DataFrame) -> None

Assert that the data is internally consistent

Parameters:

Name Type Description Default
indf DataFrame

Data to check

required

Raises:

Type Description
InternalConsistencyError

The data is not internally consistent

Source code in src/gcages/cmip7_scenariomip/pre_processing/reaggregation/basic.py
def assert_is_internally_consistent(self, indf: pd.DataFrame) -> None:
    """
    Assert that the data is internally consistent

    Parameters
    ----------
    indf
        Data to check

    Raises
    ------
    InternalConsistencyError
        The data is not internally consistent
    """
    assert_is_internally_consistent(
        indf,
        model_regions=self.model_regions,
        tolerances=self.internal_consistency_tolerances,
        world_region=self.world_region,
        region_level=self.region_level,
        unit_level=self.unit_level,
        variable_level=self.variable_level,
    )

default_tols_internal_consistency #

default_tols_internal_consistency() -> (
    Mapping[str, Mapping[str, float]]
    | Mapping[str, Mapping[str, PINT_SCALAR]]
)

Get default tolerances for internal consistency checks

Source code in src/gcages/cmip7_scenariomip/pre_processing/reaggregation/basic.py
@internal_consistency_tolerances.default
def default_tols_internal_consistency(
    self,
) -> Mapping[str, Mapping[str, float]] | Mapping[str, Mapping[str, PINT_SCALAR]]:
    """
    Get default tolerances for internal consistency checks
    """
    return get_default_internal_conistency_checking_tolerances()

get_internal_consistency_checking_index #

get_internal_consistency_checking_index() -> MultiIndex

Get the index which selects only data relevant for checking internal consistency

Returns:

Type Description
MultiIndex

Internal consistency checking index

Source code in src/gcages/cmip7_scenariomip/pre_processing/reaggregation/basic.py
def get_internal_consistency_checking_index(self) -> pd.MultiIndex:
    """
    Get the index which selects only data relevant for checking internal consistency

    Returns
    -------
    :
        Internal consistency checking index
    """
    return get_internal_consistency_checking_index(
        model_regions=self.model_regions,
        world_region=self.world_region,
        region_level=self.region_level,
        variable_level=self.variable_level,
    )

to_complete #

to_complete(raw: DataFrame) -> ToCompleteResult

Convert the raw data to complete data

Parameters:

Name Type Description Default
raw DataFrame

Raw data

required

Returns:

Type Description
ToCompleteResult

To complete result

Source code in src/gcages/cmip7_scenariomip/pre_processing/reaggregation/basic.py
def to_complete(self, raw: pd.DataFrame) -> ToCompleteResult:
    """
    Convert the raw data to complete data

    Parameters
    ----------
    raw
        Raw data

    Returns
    -------
    :
        To complete result
    """
    return to_complete(
        indf=raw,
        model_regions=self.model_regions,
        unit_level=self.unit_level,
        variable_level=self.variable_level,
        region_level=self.region_level,
        world_region=self.world_region,
    )

to_gridding_sectors #

to_gridding_sectors(indf: DataFrame) -> DataFrame

Re-aggregate data to the sectors used for gridding

Parameters:

Name Type Description Default
indf DataFrame

Data to re-aggregate

required

Returns:

Type Description
DataFrame

Data re-aggregated to the gridding sectors

Source code in src/gcages/cmip7_scenariomip/pre_processing/reaggregation/basic.py
def to_gridding_sectors(self, indf: pd.DataFrame) -> pd.DataFrame:
    """
    Re-aggregate data to the sectors used for gridding

    Parameters
    ----------
    indf
        Data to re-aggregate

    Returns
    -------
    :
        Data re-aggregated to the gridding sectors
    """
    return to_gridding_sectors(
        indf=indf, region_level=self.region_level, world_region=self.world_region
    )

ReaggregatorLike #

Bases: Protocol

Interface that can be used for re-aggregation

Methods:

Name Description
assert_has_all_required_timeseries

Assert that the data has all the required timeseries

assert_is_internally_consistent

Assert that the data is internally consistent

get_internal_consistency_checking_index

Get the index which selects only data relevant for checking internal consistency

to_complete

Convert the raw data to complete data

to_gridding_sectors

Re-aggregate data to the sectors used for gridding

Attributes:

Name Type Description
model_regions tuple[str, ...]

Model regions to use while reaggregating

region_level str

Region level in the data index

unit_level str

Unit level in the data index

variable_level str

Variable level in the data index

world_region str

The value used when the data represents the sum over all regions

Source code in src/gcages/cmip7_scenariomip/pre_processing/pre_processor.py
class ReaggregatorLike(Protocol):
    """
    Interface that can be used for re-aggregation
    """

    model_regions: tuple[str, ...]
    """Model regions to use while reaggregating"""

    region_level: str
    """Region level in the data index"""

    unit_level: str
    """Unit level in the data index"""

    variable_level: str
    """Variable level in the data index"""

    world_region: str
    """
    The value used when the data represents the sum over all regions

    (Having a value for this is odd,
    there should really just be no region level when data is the sum,
    but this is the data format used so we have to follow this convention.)
    """

    def assert_has_all_required_timeseries(self, indf: pd.DataFrame) -> None:
        """
        Assert that the data has all the required timeseries

        Parameters
        ----------
        indf
            Data to check

        Raises
        ------
        NotCompleteError
            `indf` is not complete
        """

    def assert_is_internally_consistent(self, indf: pd.DataFrame) -> None:
        """
        Assert that the data is internally consistent

        Parameters
        ----------
        indf
            Data to check

        Raises
        ------
        InternalConsistencyError
            The data is not internally consistent
        """

    def get_internal_consistency_checking_index(self) -> pd.MultiIndex:
        """
        Get the index which selects only data relevant for checking internal consistency

        Returns
        -------
        :
            Internal consistency checking index
        """

    def to_complete(self, raw: pd.DataFrame) -> ToCompleteResult:
        """
        Convert the raw data to complete data

        Parameters
        ----------
        raw
            Raw data

        Returns
        -------
        :
            To complete result
        """

    def to_gridding_sectors(self, indf: pd.DataFrame) -> pd.DataFrame:
        """
        Re-aggregate data to the sectors used for gridding

        Parameters
        ----------
        indf
            Data to re-aggregate

        Returns
        -------
        :
            Data re-aggregated to the gridding sectors
        """

model_regions instance-attribute #

model_regions: tuple[str, ...]

Model regions to use while reaggregating

region_level instance-attribute #

region_level: str

Region level in the data index

unit_level instance-attribute #

unit_level: str

Unit level in the data index

variable_level instance-attribute #

variable_level: str

Variable level in the data index

world_region instance-attribute #

world_region: str

The value used when the data represents the sum over all regions

(Having a value for this is odd, there should really just be no region level when data is the sum, but this is the data format used so we have to follow this convention.)

assert_has_all_required_timeseries #

assert_has_all_required_timeseries(indf: DataFrame) -> None

Assert that the data has all the required timeseries

Parameters:

Name Type Description Default
indf DataFrame

Data to check

required

Raises:

Type Description
NotCompleteError

indf is not complete

Source code in src/gcages/cmip7_scenariomip/pre_processing/pre_processor.py
def assert_has_all_required_timeseries(self, indf: pd.DataFrame) -> None:
    """
    Assert that the data has all the required timeseries

    Parameters
    ----------
    indf
        Data to check

    Raises
    ------
    NotCompleteError
        `indf` is not complete
    """

assert_is_internally_consistent #

assert_is_internally_consistent(indf: DataFrame) -> None

Assert that the data is internally consistent

Parameters:

Name Type Description Default
indf DataFrame

Data to check

required

Raises:

Type Description
InternalConsistencyError

The data is not internally consistent

Source code in src/gcages/cmip7_scenariomip/pre_processing/pre_processor.py
def assert_is_internally_consistent(self, indf: pd.DataFrame) -> None:
    """
    Assert that the data is internally consistent

    Parameters
    ----------
    indf
        Data to check

    Raises
    ------
    InternalConsistencyError
        The data is not internally consistent
    """

get_internal_consistency_checking_index #

get_internal_consistency_checking_index() -> MultiIndex

Get the index which selects only data relevant for checking internal consistency

Returns:

Type Description
MultiIndex

Internal consistency checking index

Source code in src/gcages/cmip7_scenariomip/pre_processing/pre_processor.py
def get_internal_consistency_checking_index(self) -> pd.MultiIndex:
    """
    Get the index which selects only data relevant for checking internal consistency

    Returns
    -------
    :
        Internal consistency checking index
    """

to_complete #

to_complete(raw: DataFrame) -> ToCompleteResult

Convert the raw data to complete data

Parameters:

Name Type Description Default
raw DataFrame

Raw data

required

Returns:

Type Description
ToCompleteResult

To complete result

Source code in src/gcages/cmip7_scenariomip/pre_processing/pre_processor.py
def to_complete(self, raw: pd.DataFrame) -> ToCompleteResult:
    """
    Convert the raw data to complete data

    Parameters
    ----------
    raw
        Raw data

    Returns
    -------
    :
        To complete result
    """

to_gridding_sectors #

to_gridding_sectors(indf: DataFrame) -> DataFrame

Re-aggregate data to the sectors used for gridding

Parameters:

Name Type Description Default
indf DataFrame

Data to re-aggregate

required

Returns:

Type Description
DataFrame

Data re-aggregated to the gridding sectors

Source code in src/gcages/cmip7_scenariomip/pre_processing/pre_processor.py
def to_gridding_sectors(self, indf: pd.DataFrame) -> pd.DataFrame:
    """
    Re-aggregate data to the sectors used for gridding

    Parameters
    ----------
    indf
        Data to re-aggregate

    Returns
    -------
    :
        Data re-aggregated to the gridding sectors
    """

create_cmip7_scenariomip_global_harmoniser #

create_cmip7_scenariomip_global_harmoniser(
    cmip7_scenariomip_global_historical_emissions_file: Path,
    aneris_global_overrides_file: Path,
    run_checks: bool = True,
    progress: bool = True,
    n_processes: int | None = cpu_count(),
) -> AnerisHarmoniser

Create an Aneris harmoniser configured for CMIP7 ScenarioMIP global emissions.

Parameters:

Name Type Description Default
cmip7_scenariomip_global_historical_emissions_file Path

File containing CMIP7 ScenarioMIP historical emissions.

required
aneris_global_overrides_file Path

File containing aneris overrides for the global workflow.

required
run_checks bool

Should checks of input and output data be performed?

True
progress bool

Should progress bars be shown?

True
n_processes int | None

Number of processes to use for parallel processing.

cpu_count()

Returns:

Type Description
AnerisHarmoniser

Harmoniser that will behave in line with CMIP7 ScenarioMIP's global workflow

Source code in src/gcages/cmip7_scenariomip/harmonisation.py
def create_cmip7_scenariomip_global_harmoniser(
    cmip7_scenariomip_global_historical_emissions_file: Path,
    aneris_global_overrides_file: Path,
    run_checks: bool = True,
    progress: bool = True,
    n_processes: int | None = multiprocessing.cpu_count(),
) -> AnerisHarmoniser:
    """
    Create an Aneris harmoniser configured for CMIP7 ScenarioMIP global emissions.

    Parameters
    ----------
    cmip7_scenariomip_global_historical_emissions_file
        File containing CMIP7 ScenarioMIP historical emissions.

    aneris_global_overrides_file
        File containing aneris overrides for the global workflow.

    run_checks
        Should checks of input and output data be performed?

    progress
        Should progress bars be shown?

    n_processes
        Number of processes to use for parallel processing.

    Returns
    -------
    :
        Harmoniser that will behave in line with CMIP7 ScenarioMIP's global workflow
    """
    historical_emissions = load_cmip7_scenariomip_historical_emissions(
        filepath=cmip7_scenariomip_global_historical_emissions_file,
        check_hash=True,
    )

    # Drop out the model and scenario levels
    historical_emissions = historical_emissions.reset_index(
        historical_emissions.index.names.difference(["variable", "region", "unit"]),  # type: ignore # pandas-stubs out of date
        drop=True,
    )

    # Use gcages naming to match pre-processed outputs.
    historical_emissions = update_index_levels_func(
        historical_emissions,
        {
            "variable": lambda x: convert_variable_name(
                x,
                from_convention=SupportedNamingConventions.CMIP7_SCENARIOMIP,
                to_convention=SupportedNamingConventions.GCAGES,
            )
        },
        copy=False,
    )

    aneris_overrides = load_aneris_overrides_file(aneris_global_overrides_file)
    # Type juggling for mypy: from series to dataframe back to series
    # TODO: remove this as it isn't needed for pandas-openscm 0.8.1
    aneris_overrides_df = aneris_overrides.to_frame(name="method")
    updated_df = update_index_levels_func(
        aneris_overrides_df,
        {
            "variable": lambda x: convert_variable_name(
                x,
                from_convention=SupportedNamingConventions.CMIP7_SCENARIOMIP,
                to_convention=SupportedNamingConventions.GCAGES,
            )
        },
        copy=False,
    )
    aneris_overrides = updated_df["method"]

    return AnerisHarmoniser(
        historical_emissions=historical_emissions,
        # Hard-coded as this was what was used.
        # If people want a different year, we can change the interface
        # but that requires thinking about historical emissions too
        # so we deliberately hard-code here.
        harmonisation_year=2023,
        aneris_overrides=aneris_overrides,
        run_checks=run_checks,
        progress=progress,
        n_processes=n_processes,
    )