Skip to content

gcages.harmonisation#

General harmonisation tools

Modules:

Name Description
aneris

Harmonisation using aneris

common

Common tools across different approaches

Classes:

Name Description
AnerisHarmoniser

Harmoniser that uses aneris

NotHarmonisedError

Raised when a pd.DataFrame is not harmonised

Functions:

Name Description
assert_harmonised

Assert that the input is harmonised

AnerisHarmoniser #

Harmoniser that uses aneris

Methods:

Name Description
__call__

Harmonise

validate_aneris_overrides

Validate the aneris overrides value

validate_historical_emissions

Validate the historical emissions value

Attributes:

Name Type Description
aneris_overrides Series[str] | None

Overrides to supply to aneris.convenience.harmonise_all

harmonisation_year int

Year in which to harmonise

historical_emissions DataFrame

Historical emissions to use for harmonisation

n_processes int | None

Number of processes to use for parallel processing.

progress bool

Should progress bars be shown for each operation?

region_level str

Level in data indexes that represents the region of the timeseries

run_checks bool

If True, run checks on both input and output data

scenario_group_levels list[str]

Levels in data indexes to use to group data into scenarios

unit_level str

Level in data indexes that represents the unit of the timeseries

variable_level str

Level in data indexes that represents the variable of the timeseries

Source code in src/gcages/harmonisation/aneris.py
@define
class AnerisHarmoniser:
    """
    Harmoniser that uses [aneris](https://aneris.readthedocs.io/)
    """

    historical_emissions: pd.DataFrame = field()
    """
    Historical emissions to use for harmonisation
    """

    harmonisation_year: int
    """
    Year in which to harmonise
    """

    aneris_overrides: pd.Series[str] | None = field(default=None)
    """
    Overrides to supply to `aneris.convenience.harmonise_all`

    For source code and docs,
    see e.g. [https://github.com/iiasa/aneris/blob/v0.4.2/src/aneris/convenience.py]().
    """

    run_checks: bool = True
    """
    If `True`, run checks on both input and output data

    If you are sure about your workflow,
    you can disable the checks to speed things up
    (but we don't recommend this unless you really
    are confident about what you're doing).
    """

    variable_level: str = "variable"
    """
    Level in data indexes that represents the variable of the timeseries
    """

    region_level: str = "region"
    """
    Level in data indexes that represents the region of the timeseries
    """

    unit_level: str = "unit"
    """
    Level in data indexes that represents the unit of the timeseries
    """

    scenario_group_levels: list[str] = field(factory=lambda: ["model", "scenario"])
    """
    Levels in data indexes to use to group data into scenarios

    Here, 'scenarios' means groups of timeseries
    that will be run through a climate model.
    """

    progress: bool = True
    """
    Should progress bars be shown for each operation?
    """

    n_processes: int | None = multiprocessing.cpu_count()
    """
    Number of processes to use for parallel processing.

    Set to `None` to process in serial.
    """

    @aneris_overrides.validator
    def validate_aneris_overrides(
        self, attribute: attr.Attribute[Any], value: pd.Series[str] | None
    ) -> None:
        """
        Validate the aneris overrides value

        If `self.run_checks` is `False`, then this is a no-op
        """
        if value is None:
            return

        if not self.run_checks:
            return

        # TODO: implement a `assert_aneris_overrides_align_with_historical` function

    @historical_emissions.validator
    def validate_historical_emissions(
        self, attribute: attr.Attribute[Any], value: pd.DataFrame
    ) -> None:
        """
        Validate the historical emissions value

        If `self.run_checks` is `False`, then this is a no-op
        """
        if not self.run_checks:
            return

        assert_index_is_multiindex(value)
        assert_data_is_all_numeric(value)
        assert_has_index_levels(
            value, [self.variable_level, self.region_level, self.unit_level]
        )
        assert_has_data_for_times(
            value,
            name="historical_emissions",
            times=[self.harmonisation_year],
            allow_nan=False,
        )

    def __call__(self, in_emissions: pd.DataFrame) -> pd.DataFrame:
        """
        Harmonise

        Parameters
        ----------
        in_emissions
            Emissions to harmonise

        Returns
        -------
        :
            Harmonised emissions
        """
        if self.run_checks:
            assert_index_is_multiindex(in_emissions)
            assert_data_is_all_numeric(in_emissions)
            assert_has_index_levels(
                in_emissions,
                [
                    self.variable_level,
                    self.region_level,
                    self.unit_level,
                    # Needed for parallelisation
                    *self.scenario_group_levels,
                ],
            )
            assert_has_data_for_times(
                in_emissions,
                name="in_emissions",
                times=[self.harmonisation_year],
                allow_nan=False,
            )

            try:
                assert_metadata_values_all_allowed(
                    in_emissions,
                    metadata_key=self.variable_level,
                    allowed_values=self.historical_emissions.index.get_level_values(
                        self.variable_level
                    ).unique(),
                )
            except NotAllowedMetadataValuesError as exc:
                msg = "The input emissions contains values that aren't in history"
                raise ValueError(msg) from exc

        harmonised_df = pd.concat(
            apply_op_parallel_progress(
                func_to_call=gcages.aneris_helpers.harmonise_scenario,
                iterable_input=(
                    gdf for _, gdf in in_emissions.groupby(self.scenario_group_levels)
                ),
                parallel_op_config=ParallelOpConfig.from_user_facing(
                    progress=self.progress,
                    max_workers=self.n_processes,
                    progress_results_kwargs=dict(desc="Scenarios to harmonise"),
                ),
                history=self.historical_emissions,
                year=self.harmonisation_year,
                overrides=self.aneris_overrides,
            )
        )

        if self.run_checks:
            assert_harmonised(
                harmonised_df,
                history=self.historical_emissions,
                harmonisation_time=self.harmonisation_year,
            )

            pd.testing.assert_index_equal(  # type: ignore # pandas-stubs doesn't know about check_order
                harmonised_df.index,
                in_emissions.index,
                check_order=False,
            )
            if harmonised_df.columns.dtype != in_emissions.columns.dtype:
                msg = (
                    "Column type has changed: "
                    f"{harmonised_df.columns.dtype=} {in_emissions.columns.dtype=}"
                )
                raise AssertionError(msg)

        return harmonised_df

aneris_overrides class-attribute instance-attribute #

aneris_overrides: Series[str] | None = field(default=None)

Overrides to supply to aneris.convenience.harmonise_all

For source code and docs, see e.g. https://github.com/iiasa/aneris/blob/v0.4.2/src/aneris/convenience.py.

harmonisation_year instance-attribute #

harmonisation_year: int

Year in which to harmonise

historical_emissions class-attribute instance-attribute #

historical_emissions: DataFrame = field()

Historical emissions to use for harmonisation

n_processes class-attribute instance-attribute #

n_processes: int | None = cpu_count()

Number of processes to use for parallel processing.

Set to None to process in serial.

progress class-attribute instance-attribute #

progress: bool = True

Should progress bars be shown for each operation?

region_level class-attribute instance-attribute #

region_level: str = 'region'

Level in data indexes that represents the region of the timeseries

run_checks class-attribute instance-attribute #

run_checks: bool = True

If True, run checks on both input and output data

If you are sure about your workflow, you can disable the checks to speed things up (but we don't recommend this unless you really are confident about what you're doing).

scenario_group_levels class-attribute instance-attribute #

scenario_group_levels: list[str] = field(
    factory=lambda: ["model", "scenario"]
)

Levels in data indexes to use to group data into scenarios

Here, 'scenarios' means groups of timeseries that will be run through a climate model.

unit_level class-attribute instance-attribute #

unit_level: str = 'unit'

Level in data indexes that represents the unit of the timeseries

variable_level class-attribute instance-attribute #

variable_level: str = 'variable'

Level in data indexes that represents the variable of the timeseries

__call__ #

__call__(in_emissions: DataFrame) -> DataFrame

Harmonise

Parameters:

Name Type Description Default
in_emissions DataFrame

Emissions to harmonise

required

Returns:

Type Description
DataFrame

Harmonised emissions

Source code in src/gcages/harmonisation/aneris.py
def __call__(self, in_emissions: pd.DataFrame) -> pd.DataFrame:
    """
    Harmonise

    Parameters
    ----------
    in_emissions
        Emissions to harmonise

    Returns
    -------
    :
        Harmonised emissions
    """
    if self.run_checks:
        assert_index_is_multiindex(in_emissions)
        assert_data_is_all_numeric(in_emissions)
        assert_has_index_levels(
            in_emissions,
            [
                self.variable_level,
                self.region_level,
                self.unit_level,
                # Needed for parallelisation
                *self.scenario_group_levels,
            ],
        )
        assert_has_data_for_times(
            in_emissions,
            name="in_emissions",
            times=[self.harmonisation_year],
            allow_nan=False,
        )

        try:
            assert_metadata_values_all_allowed(
                in_emissions,
                metadata_key=self.variable_level,
                allowed_values=self.historical_emissions.index.get_level_values(
                    self.variable_level
                ).unique(),
            )
        except NotAllowedMetadataValuesError as exc:
            msg = "The input emissions contains values that aren't in history"
            raise ValueError(msg) from exc

    harmonised_df = pd.concat(
        apply_op_parallel_progress(
            func_to_call=gcages.aneris_helpers.harmonise_scenario,
            iterable_input=(
                gdf for _, gdf in in_emissions.groupby(self.scenario_group_levels)
            ),
            parallel_op_config=ParallelOpConfig.from_user_facing(
                progress=self.progress,
                max_workers=self.n_processes,
                progress_results_kwargs=dict(desc="Scenarios to harmonise"),
            ),
            history=self.historical_emissions,
            year=self.harmonisation_year,
            overrides=self.aneris_overrides,
        )
    )

    if self.run_checks:
        assert_harmonised(
            harmonised_df,
            history=self.historical_emissions,
            harmonisation_time=self.harmonisation_year,
        )

        pd.testing.assert_index_equal(  # type: ignore # pandas-stubs doesn't know about check_order
            harmonised_df.index,
            in_emissions.index,
            check_order=False,
        )
        if harmonised_df.columns.dtype != in_emissions.columns.dtype:
            msg = (
                "Column type has changed: "
                f"{harmonised_df.columns.dtype=} {in_emissions.columns.dtype=}"
            )
            raise AssertionError(msg)

    return harmonised_df

validate_aneris_overrides #

validate_aneris_overrides(
    attribute: Attribute[Any], value: Series[str] | None
) -> None

Validate the aneris overrides value

If self.run_checks is False, then this is a no-op

Source code in src/gcages/harmonisation/aneris.py
@aneris_overrides.validator
def validate_aneris_overrides(
    self, attribute: attr.Attribute[Any], value: pd.Series[str] | None
) -> None:
    """
    Validate the aneris overrides value

    If `self.run_checks` is `False`, then this is a no-op
    """
    if value is None:
        return

    if not self.run_checks:
        return

validate_historical_emissions #

validate_historical_emissions(
    attribute: Attribute[Any], value: DataFrame
) -> None

Validate the historical emissions value

If self.run_checks is False, then this is a no-op

Source code in src/gcages/harmonisation/aneris.py
@historical_emissions.validator
def validate_historical_emissions(
    self, attribute: attr.Attribute[Any], value: pd.DataFrame
) -> None:
    """
    Validate the historical emissions value

    If `self.run_checks` is `False`, then this is a no-op
    """
    if not self.run_checks:
        return

    assert_index_is_multiindex(value)
    assert_data_is_all_numeric(value)
    assert_has_index_levels(
        value, [self.variable_level, self.region_level, self.unit_level]
    )
    assert_has_data_for_times(
        value,
        name="historical_emissions",
        times=[self.harmonisation_year],
        allow_nan=False,
    )

NotHarmonisedError #

Bases: ValueError

Raised when a pd.DataFrame is not harmonised

Methods:

Name Description
__init__

Initialise the error

Source code in src/gcages/harmonisation/common.py
class NotHarmonisedError(ValueError):
    """
    Raised when a [pd.DataFrame][pandas.DataFrame] is not harmonised
    """

    def __init__(
        self,
        comparison: pd.DataFrame,
        harmonisation_time: TIME_POINT,
    ) -> None:
        """
        Initialise the error

        Parameters
        ----------
        comparison
            Results of comparing the data and history

        harmonisation_time
            Expected harmonisation time
        """
        error_msg = (
            f"The DataFrame is not harmonised in {harmonisation_time}. "
            f"comparison=\n{comparison}"
        )
        super().__init__(error_msg)

__init__ #

__init__(
    comparison: DataFrame, harmonisation_time: TIME_POINT
) -> None

Initialise the error

Parameters:

Name Type Description Default
comparison DataFrame

Results of comparing the data and history

required
harmonisation_time TIME_POINT

Expected harmonisation time

required
Source code in src/gcages/harmonisation/common.py
def __init__(
    self,
    comparison: pd.DataFrame,
    harmonisation_time: TIME_POINT,
) -> None:
    """
    Initialise the error

    Parameters
    ----------
    comparison
        Results of comparing the data and history

    harmonisation_time
        Expected harmonisation time
    """
    error_msg = (
        f"The DataFrame is not harmonised in {harmonisation_time}. "
        f"comparison=\n{comparison}"
    )
    super().__init__(error_msg)

assert_harmonised #

assert_harmonised(
    df: TimeseriesDataFrame,
    *,
    history: TimeseriesDataFrame,
    harmonisation_time: TIME_POINT,
    rounding: int = 10,
    df_unit_level: str = "unit",
    history_unit_level: str | None = None,
    ur: UnitRegistry | None = None,
) -> None

Assert that the input is harmonised

Parameters:

Name Type Description Default
df TimeseriesDataFrame

Data to check

required
history TimeseriesDataFrame

History to which df should be harmonised

required
harmonisation_time TIME_POINT

Time at which df should be harmonised to history

required
rounding int

Rounding to apply to the data before comparing

10
df_unit_level str

Level in df's index which has unit information

Only used if unit conversion is required

'unit'
history_unit_level str | None

Level in history's index which has unit information

If not provided, we assume this is the same as df_unit_level

Only used if unit conversion is required

None
ur UnitRegistry | None

Unit registry to use for determining unit conversions

Passed to gcages.units_helpers.convert_unit_like

Only used if unit conversion is required

None

Raises:

Type Description
NotHarmonisedError

df is not harmonised to history

Source code in src/gcages/harmonisation/common.py
def assert_harmonised(  # noqa: PLR0913
    df: TimeseriesDataFrame,
    *,
    history: TimeseriesDataFrame,
    harmonisation_time: TIME_POINT,
    rounding: int = 10,
    df_unit_level: str = "unit",
    history_unit_level: str | None = None,
    ur: pint.UnitRegistry | None = None,
) -> None:
    """
    Assert that the input is harmonised

    Parameters
    ----------
    df
        Data to check

    history
        History to which `df` should be harmonised

    harmonisation_time
        Time at which `df` should be harmonised to `history`

    rounding
        Rounding to apply to the data before comparing

    df_unit_level
        Level in `df`'s index which has unit information

        Only used if unit conversion is required

    history_unit_level
        Level in `history`'s index which has unit information

        If not provided, we assume this is the same as `df_unit_level`

        Only used if unit conversion is required

    ur
        Unit registry to use for determining unit conversions

        Passed to [gcages.units_helpers.convert_unit_like][]

        Only used if unit conversion is required

    Raises
    ------
    NotHarmonisedError
        `df` is not harmonised to `history`
    """
    df_unit_match = convert_unit_like(
        df,
        target=history,
        df_unit_level=df_unit_level,
        target_unit_level=history_unit_level,
        ur=ur,
    )
    df_harm_year_aligned, history_harm_year_aligned = align_history_to_data_at_time(
        df_unit_match, history=history, time=harmonisation_time
    )
    comparison = df_harm_year_aligned.round(rounding).compare(  # type: ignore # pandas-stubs confused
        history_harm_year_aligned.round(rounding), result_names=("df", "history")
    )
    if not comparison.empty:
        raise NotHarmonisedError(
            comparison=comparison, harmonisation_time=harmonisation_time
        )