Skip to content

gcages.ar6#

AR6 components

Modules:

Name Description
harmonisation

Harmonisation part of the AR6 workflow

infilling

Infilling part of the AR6 workflow

post_processing

Post-processing part of the AR6 workflow

pre_processing

Pre-processing part of the workflow

scm_running

Simple climate model (SCM) running part of the AR6 workflow

Classes:

Name Description
AR6Harmoniser

Harmoniser that follows the same logic as was used in AR6

AR6Infiller

Infiller that follows the same logic as was used in AR6

AR6PostProcessor

Post-processor that follows the same logic as was used in AR6

AR6PreProcessor

Pre-processor that follows the same logic as was used in AR6

AR6SCMRunner

Simple climate model runner that follows the same logic as was used in AR6

Functions:

Name Description
get_ar6_full_historical_emissions

Get the full AR6 historical emissions

AR6Harmoniser #

Harmoniser that follows the same logic as was used in AR6

If you want exactly the same behaviour as in AR6, initialise using from_ar6_like_config

Methods:

Name Description
__call__

Harmonise

from_ar6_config

Initialise from the config used in AR6

validate_aneris_overrides

Validate the aneris overrides value

validate_historical_emissions

Validate the historical emissions value

Attributes:

Name Type Description
aneris_overrides Series[str] | None

Overrides to supply to aneris.convenience.harmonise_all

calc_scaling_year int

Year to use for calculating a scaling factor from historical

harmonisation_year int

Year in which to harmonise

historical_emissions DataFrame

Historical emissions to use for harmonisation

n_processes int

Number of processes to use for parallel processing.

progress bool

Should progress bars be shown for each operation?

run_checks bool

If True, run checks on both input and output data

Source code in src/gcages/ar6/harmonisation.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
@define
class AR6Harmoniser:
    """
    Harmoniser that follows the same logic as was used in AR6

    If you want exactly the same behaviour as in AR6,
    initialise using [`from_ar6_like_config`][(c)]
    """

    historical_emissions: pd.DataFrame = field()
    """
    Historical emissions to use for harmonisation
    """

    harmonisation_year: int
    """
    Year in which to harmonise
    """

    calc_scaling_year: int
    """
    Year to use for calculating a scaling factor from historical

    This is only needed if `self.harmonisation_year`
    is not in the emissions to be harmonised.

    For example, if `self.harmonisation_year` is 2015
    and `self.calc_scaling_year` is 2010
    and we have a scenario without 2015 data,
    then we will use the difference from historical in 2010
    to infer a value for 2015.

    This logic was perculiar to AR6, it may not be repeated.
    """

    aneris_overrides: pd.Series[str] | None = field()
    """
    Overrides to supply to `aneris.convenience.harmonise_all`

    For source code and docs,
    see e.g. [https://github.com/iiasa/aneris/blob/v0.4.2/src/aneris/convenience.py]().
    """

    run_checks: bool = True
    """
    If `True`, run checks on both input and output data

    If you are sure about your workflow,
    you can disable the checks to speed things up
    (but we don't recommend this unless you really
    are confident about what you're doing).
    """

    progress: bool = True
    """
    Should progress bars be shown for each operation?
    """

    n_processes: int = multiprocessing.cpu_count()
    """
    Number of processes to use for parallel processing.

    Set to 1 to process in serial.
    """

    @aneris_overrides.validator
    def validate_aneris_overrides(
        self, attribute: attr.Attribute[Any], value: pd.DataFrame | None
    ) -> None:
        """
        Validate the aneris overrides value

        If `self.run_checks` is `False`, then this is a no-op
        """
        if value is None:
            return

        if not self.run_checks:
            return

        # TODO: implement a `assert_aneris_overrides_align_with_historical` function

    @historical_emissions.validator
    def validate_historical_emissions(
        self, attribute: attr.Attribute[Any], value: pd.DataFrame
    ) -> None:
        """
        Validate the historical emissions value

        If `self.run_checks` is `False`, then this is a no-op
        """
        if not self.run_checks:
            return

        assert_index_is_multiindex(value)
        assert_data_is_all_numeric(value)
        assert_has_index_levels(value, ["variable", "unit"])
        assert_has_data_for_times(
            value,
            name="historical_emissions",
            times=[self.harmonisation_year],
            allow_nan=False,
        )

    def __call__(self, in_emissions: pd.DataFrame) -> pd.DataFrame:
        """
        Harmonise

        Parameters
        ----------
        in_emissions
            Emissions to harmonise

        Returns
        -------
        :
            Harmonised emissions
        """
        if self.run_checks:
            assert_index_is_multiindex(in_emissions)
            assert_data_is_all_numeric(in_emissions)
            # Needed for parallelisation
            assert_has_index_levels(
                in_emissions, ["variable", "unit", "model", "scenario"]
            )
            try:
                assert_has_data_for_times(
                    in_emissions,
                    name="in_emissions",
                    times=[self.harmonisation_year],
                    allow_nan=False,
                )
            except MissingDataForTimesError as exc_hy:
                try:
                    assert_has_data_for_times(
                        in_emissions,
                        name="in_emissions",
                        times=[self.calc_scaling_year],
                        allow_nan=False,
                    )
                except MissingDataForTimesError as exc_csy:
                    msg = (
                        f"We require data for either {self.harmonisation_year} "
                        f"or {self.calc_scaling_year} "
                        "but neither had the required data. "
                        f"Error from checking for {self.harmonisation_year} data: "
                        f"{exc_hy}. "
                        f"Error from checking for {self.calc_scaling_year} data: "
                        f"{exc_csy}. "
                    )
                    raise KeyError(msg)

            assert_metadata_values_all_allowed(
                in_emissions,
                metadata_key="variable",
                allowed_values=self.historical_emissions.index.get_level_values(
                    "variable"
                ).unique(),
            )

        harmonised_df = pd.concat(
            apply_op_parallel_progress(
                func_to_call=harmonise_single_scenario,
                iterable_input=(
                    gdf for _, gdf in in_emissions.groupby(["model", "scenario"])
                ),
                parallel_op_config=ParallelOpConfig.from_user_facing(
                    progress=self.progress,
                    max_workers=self.n_processes,
                ),
                history=self.historical_emissions,
                harmonisation_year=self.harmonisation_year,
                overrides=self.aneris_overrides,
                calc_scaling_year=self.calc_scaling_year,
            )
        )

        if self.run_checks:
            assert_harmonised(
                harmonised_df,
                history=self.historical_emissions,
                harmonisation_time=self.harmonisation_year,
            )

            pd.testing.assert_index_equal(  # type: ignore # pandas-stubs doesn't know about check_order
                harmonised_df.index,
                in_emissions.index,
                check_order=False,
            )
            if harmonised_df.columns.dtype != in_emissions.columns.dtype:
                msg = (
                    "Column type has changed: "
                    f"{harmonised_df.columns.dtype=} {in_emissions.columns.dtype=}"
                )
                raise AssertionError(msg)

        return harmonised_df

    @classmethod
    def from_ar6_config(
        cls,
        ar6_historical_emissions_file: Path,
        run_checks: bool = True,
        progress: bool = True,
        n_processes: int = multiprocessing.cpu_count(),
    ) -> AR6Harmoniser:
        """
        Initialise from the config used in AR6

        Parameters
        ----------
        ar6_historical_emissions_file
            File containing the AR6 historical emissions

        run_checks
            Should checks of the input and output data be performed?

            If this is turned off, things are faster,
            but error messages are much less clear if things go wrong.

        progress
            Should a progress bar be shown for each operation?

        n_processes
            Number of processes to use for parallel processing.

            Set to 1 to process in serial.

        Returns
        -------
        :
            Initialised harmoniser
        """
        historical_emissions = load_ar6_historical_emissions(
            ar6_historical_emissions_file
        )

        # Drop out all metadata except region, variable and unit
        historical_emissions = historical_emissions.reset_index(
            historical_emissions.index.names.difference(["variable", "region", "unit"]),  # type: ignore # pandas-stubs out of date
            drop=True,
        )

        # Strip off prefix
        historical_emissions = update_index_levels_func(
            historical_emissions,
            {
                "variable": lambda x: x.replace("AR6 climate diagnostics|", "").replace(
                    "|Unharmonized", ""
                )
            },
            copy=False,
        )

        # Drop down to only the variables we care about
        historical_emissions = historical_emissions.loc[
            historical_emissions.index.get_level_values("variable").isin(
                [
                    "Emissions|BC",
                    "Emissions|PFC|C2F6",
                    "Emissions|PFC|C6F14",
                    "Emissions|PFC|CF4",
                    "Emissions|CO",
                    "Emissions|CO2",
                    "Emissions|CO2|AFOLU",
                    "Emissions|CO2|Energy and Industrial Processes",
                    "Emissions|CH4",
                    # "Emissions|F-Gases",  # Not used
                    # "Emissions|HFC",  # Not used
                    "Emissions|HFC|HFC125",
                    "Emissions|HFC|HFC134a",
                    "Emissions|HFC|HFC143a",
                    "Emissions|HFC|HFC227ea",
                    "Emissions|HFC|HFC23",
                    # 'Emissions|HFC|HFC245ca',  # all nan in historical dataset (RCMIP)
                    # "Emissions|HFC|HFC245fa",  # not in historical dataset (RCMIP)
                    "Emissions|HFC|HFC32",
                    "Emissions|HFC|HFC43-10",
                    "Emissions|N2O",
                    "Emissions|NH3",
                    "Emissions|NOx",
                    "Emissions|OC",
                    # "Emissions|PFC",  # Not used
                    "Emissions|SF6",
                    "Emissions|Sulfur",
                    "Emissions|VOC",
                ]
            )
        ]

        # Update variable names
        historical_emissions = rename_variables(
            historical_emissions,
            from_convention=SupportedNamingConventions.IAMC,
            to_convention=SupportedNamingConventions.GCAGES,
            index_level="variable",
            copy=False,
        )

        # Strip out any units that won't play nice with pint
        historical_emissions = strip_pint_incompatible_characters_from_units(
            historical_emissions, units_index_level="unit"
        )

        # Drop out rows with all NaNs
        historical_emissions = historical_emissions.dropna(how="all")

        # We don't need historical emissions after 1990
        # (probably even later, but this is fine).
        historical_emissions = historical_emissions.loc[:, 1990:]

        # All variables not mentioned here use aneris' default decision tree
        aneris_overrides_ar6_df = pd.DataFrame(
            [
                # Not used
                # {
                #     # high historical variance (cov=16.2)
                #     "method": "reduce_ratio_2150_cov",
                #     "variable": "Emissions|PFC",
                # },
                {
                    # high historical variance (cov=16.2)
                    "method": "reduce_ratio_2150_cov",
                    "variable": "Emissions|PFC|C2F6",
                },
                {
                    # high historical variance (cov=15.4)
                    "method": "reduce_ratio_2150_cov",
                    "variable": "Emissions|PFC|C6F14",
                },
                {
                    # high historical variance (cov=11.2)
                    "method": "reduce_ratio_2150_cov",
                    "variable": "Emissions|PFC|CF4",
                },
                {
                    # high historical variance (cov=15.4)
                    "method": "reduce_ratio_2150_cov",
                    "variable": "Emissions|CO",
                },
                {
                    # always ratio method by choice
                    "method": "reduce_ratio_2080",
                    "variable": "Emissions|CO2",
                },
                {
                    # high historical variance,
                    # but using offset method to prevent diff
                    # from increasing when going negative rapidly (cov=23.2)
                    "method": "reduce_offset_2150_cov",
                    "variable": "Emissions|CO2|AFOLU",
                },
                {
                    # always ratio method by choice
                    "method": "reduce_ratio_2080",
                    "variable": "Emissions|CO2|Energy and Industrial Processes",
                },
                # Not used
                # {
                #     # basket not used in infilling
                #     # (sum of f-gases with low model reporting confidence)
                #     "method": "constant_ratio",
                #     "variable": "Emissions|F-Gases",
                # },
                # Not used
                # {
                #     # basket not used in infilling
                #     # (sum of subset of f-gases with low model reporting confidence)
                #     "method": "constant_ratio",
                #     "variable": "Emissions|HFC",
                # },
                {
                    # minor f-gas with low model reporting confidence
                    "method": "constant_ratio",
                    "variable": "Emissions|HFC|HFC125",
                },
                {
                    # minor f-gas with low model reporting confidence
                    "method": "constant_ratio",
                    "variable": "Emissions|HFC|HFC134a",
                },
                {
                    # minor f-gas with low model reporting confidence
                    "method": "constant_ratio",
                    "variable": "Emissions|HFC|HFC143a",
                },
                {
                    # minor f-gas with low model reporting confidence
                    "method": "constant_ratio",
                    "variable": "Emissions|HFC|HFC227ea",
                },
                {
                    # minor f-gas with low model reporting confidence
                    "method": "constant_ratio",
                    "variable": "Emissions|HFC|HFC23",
                },
                {
                    # minor f-gas with low model reporting confidence
                    "method": "constant_ratio",
                    "variable": "Emissions|HFC|HFC32",
                },
                {
                    # minor f-gas with low model reporting confidence
                    "method": "constant_ratio",
                    "variable": "Emissions|HFC|HFC43-10",
                },
                {
                    # high historical variance (cov=18.5)
                    "method": "reduce_ratio_2150_cov",
                    "variable": "Emissions|OC",
                },
                {
                    # minor f-gas with low model reporting confidence
                    "method": "constant_ratio",
                    "variable": "Emissions|SF6",
                },
                {
                    # high historical variance (cov=12.0)
                    "method": "reduce_ratio_2150_cov",
                    "variable": "Emissions|VOC",
                },
            ]
        )
        aneris_overrides_ar6_df["variable"] = aneris_overrides_ar6_df["variable"].map(
            partial(
                convert_variable_name,
                from_convention=SupportedNamingConventions.IAMC,
                to_convention=SupportedNamingConventions.GCAGES,
            )
        )
        aneris_overrides_ar6 = aneris_overrides_ar6_df.set_index("variable")["method"]

        return cls(
            historical_emissions=historical_emissions,
            harmonisation_year=2015,
            calc_scaling_year=2010,
            aneris_overrides=aneris_overrides_ar6,
            run_checks=run_checks,
            n_processes=n_processes,
            progress=progress,
        )

aneris_overrides class-attribute instance-attribute #

aneris_overrides: Series[str] | None = field()

Overrides to supply to aneris.convenience.harmonise_all

For source code and docs, see e.g. https://github.com/iiasa/aneris/blob/v0.4.2/src/aneris/convenience.py.

calc_scaling_year instance-attribute #

calc_scaling_year: int

Year to use for calculating a scaling factor from historical

This is only needed if self.harmonisation_year is not in the emissions to be harmonised.

For example, if self.harmonisation_year is 2015 and self.calc_scaling_year is 2010 and we have a scenario without 2015 data, then we will use the difference from historical in 2010 to infer a value for 2015.

This logic was perculiar to AR6, it may not be repeated.

harmonisation_year instance-attribute #

harmonisation_year: int

Year in which to harmonise

historical_emissions class-attribute instance-attribute #

historical_emissions: DataFrame = field()

Historical emissions to use for harmonisation

n_processes class-attribute instance-attribute #

n_processes: int = cpu_count()

Number of processes to use for parallel processing.

Set to 1 to process in serial.

progress class-attribute instance-attribute #

progress: bool = True

Should progress bars be shown for each operation?

run_checks class-attribute instance-attribute #

run_checks: bool = True

If True, run checks on both input and output data

If you are sure about your workflow, you can disable the checks to speed things up (but we don't recommend this unless you really are confident about what you're doing).

__call__ #

__call__(in_emissions: DataFrame) -> DataFrame

Harmonise

Parameters:

Name Type Description Default
in_emissions DataFrame

Emissions to harmonise

required

Returns:

Type Description
DataFrame

Harmonised emissions

Source code in src/gcages/ar6/harmonisation.py
def __call__(self, in_emissions: pd.DataFrame) -> pd.DataFrame:
    """
    Harmonise

    Parameters
    ----------
    in_emissions
        Emissions to harmonise

    Returns
    -------
    :
        Harmonised emissions
    """
    if self.run_checks:
        assert_index_is_multiindex(in_emissions)
        assert_data_is_all_numeric(in_emissions)
        # Needed for parallelisation
        assert_has_index_levels(
            in_emissions, ["variable", "unit", "model", "scenario"]
        )
        try:
            assert_has_data_for_times(
                in_emissions,
                name="in_emissions",
                times=[self.harmonisation_year],
                allow_nan=False,
            )
        except MissingDataForTimesError as exc_hy:
            try:
                assert_has_data_for_times(
                    in_emissions,
                    name="in_emissions",
                    times=[self.calc_scaling_year],
                    allow_nan=False,
                )
            except MissingDataForTimesError as exc_csy:
                msg = (
                    f"We require data for either {self.harmonisation_year} "
                    f"or {self.calc_scaling_year} "
                    "but neither had the required data. "
                    f"Error from checking for {self.harmonisation_year} data: "
                    f"{exc_hy}. "
                    f"Error from checking for {self.calc_scaling_year} data: "
                    f"{exc_csy}. "
                )
                raise KeyError(msg)

        assert_metadata_values_all_allowed(
            in_emissions,
            metadata_key="variable",
            allowed_values=self.historical_emissions.index.get_level_values(
                "variable"
            ).unique(),
        )

    harmonised_df = pd.concat(
        apply_op_parallel_progress(
            func_to_call=harmonise_single_scenario,
            iterable_input=(
                gdf for _, gdf in in_emissions.groupby(["model", "scenario"])
            ),
            parallel_op_config=ParallelOpConfig.from_user_facing(
                progress=self.progress,
                max_workers=self.n_processes,
            ),
            history=self.historical_emissions,
            harmonisation_year=self.harmonisation_year,
            overrides=self.aneris_overrides,
            calc_scaling_year=self.calc_scaling_year,
        )
    )

    if self.run_checks:
        assert_harmonised(
            harmonised_df,
            history=self.historical_emissions,
            harmonisation_time=self.harmonisation_year,
        )

        pd.testing.assert_index_equal(  # type: ignore # pandas-stubs doesn't know about check_order
            harmonised_df.index,
            in_emissions.index,
            check_order=False,
        )
        if harmonised_df.columns.dtype != in_emissions.columns.dtype:
            msg = (
                "Column type has changed: "
                f"{harmonised_df.columns.dtype=} {in_emissions.columns.dtype=}"
            )
            raise AssertionError(msg)

    return harmonised_df

from_ar6_config classmethod #

from_ar6_config(
    ar6_historical_emissions_file: Path,
    run_checks: bool = True,
    progress: bool = True,
    n_processes: int = cpu_count(),
) -> AR6Harmoniser

Initialise from the config used in AR6

Parameters:

Name Type Description Default
ar6_historical_emissions_file Path

File containing the AR6 historical emissions

required
run_checks bool

Should checks of the input and output data be performed?

If this is turned off, things are faster, but error messages are much less clear if things go wrong.

True
progress bool

Should a progress bar be shown for each operation?

True
n_processes int

Number of processes to use for parallel processing.

Set to 1 to process in serial.

cpu_count()

Returns:

Type Description
AR6Harmoniser

Initialised harmoniser

Source code in src/gcages/ar6/harmonisation.py
@classmethod
def from_ar6_config(
    cls,
    ar6_historical_emissions_file: Path,
    run_checks: bool = True,
    progress: bool = True,
    n_processes: int = multiprocessing.cpu_count(),
) -> AR6Harmoniser:
    """
    Initialise from the config used in AR6

    Parameters
    ----------
    ar6_historical_emissions_file
        File containing the AR6 historical emissions

    run_checks
        Should checks of the input and output data be performed?

        If this is turned off, things are faster,
        but error messages are much less clear if things go wrong.

    progress
        Should a progress bar be shown for each operation?

    n_processes
        Number of processes to use for parallel processing.

        Set to 1 to process in serial.

    Returns
    -------
    :
        Initialised harmoniser
    """
    historical_emissions = load_ar6_historical_emissions(
        ar6_historical_emissions_file
    )

    # Drop out all metadata except region, variable and unit
    historical_emissions = historical_emissions.reset_index(
        historical_emissions.index.names.difference(["variable", "region", "unit"]),  # type: ignore # pandas-stubs out of date
        drop=True,
    )

    # Strip off prefix
    historical_emissions = update_index_levels_func(
        historical_emissions,
        {
            "variable": lambda x: x.replace("AR6 climate diagnostics|", "").replace(
                "|Unharmonized", ""
            )
        },
        copy=False,
    )

    # Drop down to only the variables we care about
    historical_emissions = historical_emissions.loc[
        historical_emissions.index.get_level_values("variable").isin(
            [
                "Emissions|BC",
                "Emissions|PFC|C2F6",
                "Emissions|PFC|C6F14",
                "Emissions|PFC|CF4",
                "Emissions|CO",
                "Emissions|CO2",
                "Emissions|CO2|AFOLU",
                "Emissions|CO2|Energy and Industrial Processes",
                "Emissions|CH4",
                # "Emissions|F-Gases",  # Not used
                # "Emissions|HFC",  # Not used
                "Emissions|HFC|HFC125",
                "Emissions|HFC|HFC134a",
                "Emissions|HFC|HFC143a",
                "Emissions|HFC|HFC227ea",
                "Emissions|HFC|HFC23",
                # 'Emissions|HFC|HFC245ca',  # all nan in historical dataset (RCMIP)
                # "Emissions|HFC|HFC245fa",  # not in historical dataset (RCMIP)
                "Emissions|HFC|HFC32",
                "Emissions|HFC|HFC43-10",
                "Emissions|N2O",
                "Emissions|NH3",
                "Emissions|NOx",
                "Emissions|OC",
                # "Emissions|PFC",  # Not used
                "Emissions|SF6",
                "Emissions|Sulfur",
                "Emissions|VOC",
            ]
        )
    ]

    # Update variable names
    historical_emissions = rename_variables(
        historical_emissions,
        from_convention=SupportedNamingConventions.IAMC,
        to_convention=SupportedNamingConventions.GCAGES,
        index_level="variable",
        copy=False,
    )

    # Strip out any units that won't play nice with pint
    historical_emissions = strip_pint_incompatible_characters_from_units(
        historical_emissions, units_index_level="unit"
    )

    # Drop out rows with all NaNs
    historical_emissions = historical_emissions.dropna(how="all")

    # We don't need historical emissions after 1990
    # (probably even later, but this is fine).
    historical_emissions = historical_emissions.loc[:, 1990:]

    # All variables not mentioned here use aneris' default decision tree
    aneris_overrides_ar6_df = pd.DataFrame(
        [
            # Not used
            # {
            #     # high historical variance (cov=16.2)
            #     "method": "reduce_ratio_2150_cov",
            #     "variable": "Emissions|PFC",
            # },
            {
                # high historical variance (cov=16.2)
                "method": "reduce_ratio_2150_cov",
                "variable": "Emissions|PFC|C2F6",
            },
            {
                # high historical variance (cov=15.4)
                "method": "reduce_ratio_2150_cov",
                "variable": "Emissions|PFC|C6F14",
            },
            {
                # high historical variance (cov=11.2)
                "method": "reduce_ratio_2150_cov",
                "variable": "Emissions|PFC|CF4",
            },
            {
                # high historical variance (cov=15.4)
                "method": "reduce_ratio_2150_cov",
                "variable": "Emissions|CO",
            },
            {
                # always ratio method by choice
                "method": "reduce_ratio_2080",
                "variable": "Emissions|CO2",
            },
            {
                # high historical variance,
                # but using offset method to prevent diff
                # from increasing when going negative rapidly (cov=23.2)
                "method": "reduce_offset_2150_cov",
                "variable": "Emissions|CO2|AFOLU",
            },
            {
                # always ratio method by choice
                "method": "reduce_ratio_2080",
                "variable": "Emissions|CO2|Energy and Industrial Processes",
            },
            # Not used
            # {
            #     # basket not used in infilling
            #     # (sum of f-gases with low model reporting confidence)
            #     "method": "constant_ratio",
            #     "variable": "Emissions|F-Gases",
            # },
            # Not used
            # {
            #     # basket not used in infilling
            #     # (sum of subset of f-gases with low model reporting confidence)
            #     "method": "constant_ratio",
            #     "variable": "Emissions|HFC",
            # },
            {
                # minor f-gas with low model reporting confidence
                "method": "constant_ratio",
                "variable": "Emissions|HFC|HFC125",
            },
            {
                # minor f-gas with low model reporting confidence
                "method": "constant_ratio",
                "variable": "Emissions|HFC|HFC134a",
            },
            {
                # minor f-gas with low model reporting confidence
                "method": "constant_ratio",
                "variable": "Emissions|HFC|HFC143a",
            },
            {
                # minor f-gas with low model reporting confidence
                "method": "constant_ratio",
                "variable": "Emissions|HFC|HFC227ea",
            },
            {
                # minor f-gas with low model reporting confidence
                "method": "constant_ratio",
                "variable": "Emissions|HFC|HFC23",
            },
            {
                # minor f-gas with low model reporting confidence
                "method": "constant_ratio",
                "variable": "Emissions|HFC|HFC32",
            },
            {
                # minor f-gas with low model reporting confidence
                "method": "constant_ratio",
                "variable": "Emissions|HFC|HFC43-10",
            },
            {
                # high historical variance (cov=18.5)
                "method": "reduce_ratio_2150_cov",
                "variable": "Emissions|OC",
            },
            {
                # minor f-gas with low model reporting confidence
                "method": "constant_ratio",
                "variable": "Emissions|SF6",
            },
            {
                # high historical variance (cov=12.0)
                "method": "reduce_ratio_2150_cov",
                "variable": "Emissions|VOC",
            },
        ]
    )
    aneris_overrides_ar6_df["variable"] = aneris_overrides_ar6_df["variable"].map(
        partial(
            convert_variable_name,
            from_convention=SupportedNamingConventions.IAMC,
            to_convention=SupportedNamingConventions.GCAGES,
        )
    )
    aneris_overrides_ar6 = aneris_overrides_ar6_df.set_index("variable")["method"]

    return cls(
        historical_emissions=historical_emissions,
        harmonisation_year=2015,
        calc_scaling_year=2010,
        aneris_overrides=aneris_overrides_ar6,
        run_checks=run_checks,
        n_processes=n_processes,
        progress=progress,
    )

validate_aneris_overrides #

validate_aneris_overrides(
    attribute: Attribute[Any], value: DataFrame | None
) -> None

Validate the aneris overrides value

If self.run_checks is False, then this is a no-op

Source code in src/gcages/ar6/harmonisation.py
@aneris_overrides.validator
def validate_aneris_overrides(
    self, attribute: attr.Attribute[Any], value: pd.DataFrame | None
) -> None:
    """
    Validate the aneris overrides value

    If `self.run_checks` is `False`, then this is a no-op
    """
    if value is None:
        return

    if not self.run_checks:
        return

validate_historical_emissions #

validate_historical_emissions(
    attribute: Attribute[Any], value: DataFrame
) -> None

Validate the historical emissions value

If self.run_checks is False, then this is a no-op

Source code in src/gcages/ar6/harmonisation.py
@historical_emissions.validator
def validate_historical_emissions(
    self, attribute: attr.Attribute[Any], value: pd.DataFrame
) -> None:
    """
    Validate the historical emissions value

    If `self.run_checks` is `False`, then this is a no-op
    """
    if not self.run_checks:
        return

    assert_index_is_multiindex(value)
    assert_data_is_all_numeric(value)
    assert_has_index_levels(value, ["variable", "unit"])
    assert_has_data_for_times(
        value,
        name="historical_emissions",
        times=[self.harmonisation_year],
        allow_nan=False,
    )

AR6Infiller #

Infiller that follows the same logic as was used in AR6

If you want exactly the same behaviour as in AR6, initialise using from_ar6_config

Methods:

Name Description
__call__

Infill

from_ar6_config

Initialise from the config used in AR6

Attributes:

Name Type Description
harmonisation_year int | None

Year in which the data was harmonised

historical_emissions DataFrame | None

Historical emissions used for harmonisation

infillers Mapping[str, Callable[[DataFrame], DataFrame]]

Functions to use for infilling each variable.

n_processes int | None

Number of processes to use for parallel processing.

progress bool

Should progress bars be shown for each operation?

run_checks bool

If True, run checks on both input and output data

Source code in src/gcages/ar6/infilling.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
@define
class AR6Infiller:
    """
    Infiller that follows the same logic as was used in AR6

    If you want exactly the same behaviour as in AR6,
    initialise using [`from_ar6_config`][(c)]
    """

    infillers: Mapping[str, Callable[[pd.DataFrame], pd.DataFrame]]
    """
    Functions to use for infilling each variable.

    The keys define the variable that can be infilled.
    The variables define the function which,
    given inputs with the expected lead variables,
    returns the infilled timeseries.
    """

    run_checks: bool = True
    """
    If `True`, run checks on both input and output data

    If you are sure about your workflow,
    you can disable the checks to speed things up
    (but we don't recommend this unless you really
    are confident about what you're doing).
    """

    historical_emissions: pd.DataFrame | None = None
    """
    Historical emissions used for harmonisation

    Only required if `run_checks` is `True` to check
    that the infilled data is also harmonised.
    """

    harmonisation_year: int | None = None
    """
    Year in which the data was harmonised

    Only required if `run_checks` is `True` to check
    that the infilled data is also harmonised.
    """

    progress: bool = True
    """
    Should progress bars be shown for each operation?
    """

    n_processes: int | None = None  # better off in serial with silicone
    """
    Number of processes to use for parallel processing.

    Set to `None` to process in serial.
    """

    def __call__(self, in_emissions: pd.DataFrame) -> pd.DataFrame:
        """
        Infill

        Parameters
        ----------
        in_emissions
            Emissions to infill

        Returns
        -------
        :
            Infilled emissions
        """
        if self.run_checks:
            assert_index_is_multiindex(in_emissions)
            assert_data_is_all_numeric(in_emissions)
            # Needed for parallelisation
            assert_has_index_levels(
                in_emissions, ["variable", "unit", "model", "scenario"]
            )

        # Strip off any prefixes that might be there
        to_infill = in_emissions.pix.assign(
            variable=in_emissions.index.get_level_values("variable").map(
                lambda x: x.replace("AR6 climate diagnostics|Harmonized|", "")
            )
        ).sort_index(axis="columns")

        # This is not the most efficient parallelisation.
        # It would be better to group by variable,
        # then have an `infill_variable` function,
        # but this would require a bit more thinking
        # to actually achieve.
        infilled = pd.concat(
            [
                v.reorder_levels(to_infill.index.names)
                for v in apply_op_parallel_progress(
                    func_to_call=infill_scenario,
                    iterable_input=(
                        gdf for _, gdf in to_infill.groupby(["model", "scenario"])
                    ),
                    parallel_op_config=ParallelOpConfig.from_user_facing(
                        progress=self.progress,
                        max_workers=self.n_processes,
                        progress_results_kwargs=dict(desc="Scenarios to infill"),
                    ),
                    infillers=self.infillers,
                )
            ]
        )

        if self.run_checks:
            pd.testing.assert_index_equal(infilled.columns, in_emissions.columns)

            if self.historical_emissions is None:
                msg = "`self.historical_emissions` must be set to check the infilling"
                raise AssertionError(msg)

            if self.harmonisation_year is None:
                msg = "`self.harmonisation_year` must be set to check the infilling"
                raise AssertionError(msg)

            assert_harmonised(
                infilled,
                history=self.historical_emissions,
                harmonisation_time=self.harmonisation_year,
                rounding=5,  # level of data storage in historical data often
            )
            assert_all_groups_are_complete(
                # The combo of the input and infilled should be complete
                pd.concat(
                    [in_emissions, infilled.reorder_levels(in_emissions.index.names)]
                ),
                complete_index=self.historical_emissions.index.droplevel("unit"),
            )

        return infilled

    @classmethod
    def from_ar6_config(  # noqa: PLR0913
        cls,
        ar6_infilling_db_file: Path,
        ar6_infilling_db_cfcs_file: Path,
        variables_to_infill: Iterable[str] | None = None,
        run_checks: bool = True,
        historical_emissions: pd.DataFrame | None = None,
        harmonisation_year: int | None = None,
        progress: bool = True,
        n_processes: int | None = None,  # better off in serial with silicone
    ) -> AR6Infiller:
        """
        Initialise from the config used in AR6

        Parameters
        ----------
        ar6_infilling_db_file
            File containing the AR6 infilling database

            This is for all emissions except CFCs.

        ar6_infilling_db_cfcs_file
            File containing the AR6 infilling database for CFCs

        variables_to_infill
            Variables to infill.

            If not supplied, we use the default set from AR6.

        run_checks
            Should checks of the input and output data be performed?

            If this is turned off, things are faster,
            but error messages are much less clear if things go wrong.

        historical_emissions
            Historical emissions used for harmonisation

            Only required if `run_checks` is `True` to check
            that the infilled data is also harmonised.

        harmonisation_year
            Year in which the data was harmonised

            Only required if `run_checks` is `True` to check
            that the infilled data is also harmonised.

        progress
            Should a progress bar be shown for each operation?

        n_processes
            Number of processes to use for parallel processing.

            Set to `None` to process in serial.

        Returns
        -------
        :
            Initialised harmoniser
        """
        try:
            import silicone.database_crunchers  # type: ignore # silicone has no type hints
        except ImportError as exc:
            raise MissingOptionalDependencyError(
                "get_ar6_infiller", requirement="silicone"
            ) from exc

        VARS_DB_CRUNCHERS = {
            "Emissions|BC": (
                False,
                silicone.database_crunchers.QuantileRollingWindows,
            ),
            "Emissions|CH4": (
                False,
                silicone.database_crunchers.QuantileRollingWindows,
            ),
            "Emissions|CO2|Biosphere": (
                False,
                silicone.database_crunchers.QuantileRollingWindows,
            ),
            "Emissions|CO2|Fossil": (
                False,
                silicone.database_crunchers.QuantileRollingWindows,
            ),
            "Emissions|CO": (
                False,
                silicone.database_crunchers.QuantileRollingWindows,
            ),
            "Emissions|N2O": (
                False,
                silicone.database_crunchers.QuantileRollingWindows,
            ),
            "Emissions|NH3": (
                False,
                silicone.database_crunchers.QuantileRollingWindows,
            ),
            "Emissions|NOx": (
                False,
                silicone.database_crunchers.QuantileRollingWindows,
            ),
            "Emissions|OC": (
                False,
                silicone.database_crunchers.QuantileRollingWindows,
            ),
            "Emissions|SOx": (
                False,
                silicone.database_crunchers.QuantileRollingWindows,
            ),
            "Emissions|NMVOC": (
                False,
                silicone.database_crunchers.QuantileRollingWindows,
            ),
            "Emissions|HFC134a": (
                False,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|HFC143a": (
                False,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|HFC227ea": (
                False,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|HFC23": (
                False,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|HFC32": (
                False,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|HFC4310mee": (
                False,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|HFC125": (
                False,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|SF6": (
                False,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|CF4": (
                False,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|C2F6": (
                False,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|C6F14": (
                False,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|CCl4": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|CFC11": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|CFC113": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|CFC114": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|CFC115": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|CFC12": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|CH2Cl2": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|CH3Br": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|CH3CCl3": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|CH3Cl": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|CHCl3": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|HCFC141b": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|HCFC142b": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|HCFC22": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|HFC152a": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|HFC236fa": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|HFC365mfc": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|Halon1202": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|Halon1211": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|Halon1301": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|Halon2402": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|NF3": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|C3F8": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|C4F10": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|C5F12": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|C7F16": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|C8F18": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|cC4F8": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
            "Emissions|SO2F2": (
                True,
                silicone.database_crunchers.RMSClosest,
            ),
        }
        """
        Definition of our default set of variables to infill and how to infill them

        Each key is a variable we can infill.
        Each value is a tuple with the following:

        - `False` if we should use the 'full' infilling database,
          `True` if we should use the database
          that has information about CFCs and other species
          not typically modelled by IAMs
        - The database cruncher from silicone to use for infilling the variable
        """

        if variables_to_infill is None:
            # Technically, having to infill some of these
            # would have caused a scenario to fail vetting,
            # but we could at least in theory infill all of them.
            variables_to_infill = tuple(VARS_DB_CRUNCHERS.keys())

        lead_options = (
            ("Emissions|CO2",),
            ("Emissions|CO2|Fossil",),
        )

        infillers = {}

        for v_infill in variables_to_infill:
            cfcs, cruncher = VARS_DB_CRUNCHERS[v_infill]

            infillers[v_infill] = partial(
                do_ar6_like_infilling,
                follower=v_infill,
                lead_options=lead_options,
                db_file=ar6_infilling_db_file
                if not cfcs
                else ar6_infilling_db_cfcs_file,
                cruncher=cruncher,
                cfcs=cfcs,
            )

        # CO2 Energy and Industrial special case
        infillers["Emissions|CO2|Fossil"] = partial(
            do_ar6_like_infilling,
            follower="Emissions|CO2|Fossil",
            lead_options=(("Emissions|CO2",),),
            db_file=ar6_infilling_db_file,
            cruncher=VARS_DB_CRUNCHERS["Emissions|CO2|Fossil"][1],
            cfcs=False,
        )

        return cls(
            infillers=infillers,
            run_checks=run_checks,
            historical_emissions=historical_emissions,
            harmonisation_year=harmonisation_year,
            progress=progress,
            n_processes=n_processes,
        )

harmonisation_year class-attribute instance-attribute #

harmonisation_year: int | None = None

Year in which the data was harmonised

Only required if run_checks is True to check that the infilled data is also harmonised.

historical_emissions class-attribute instance-attribute #

historical_emissions: DataFrame | None = None

Historical emissions used for harmonisation

Only required if run_checks is True to check that the infilled data is also harmonised.

infillers instance-attribute #

Functions to use for infilling each variable.

The keys define the variable that can be infilled. The variables define the function which, given inputs with the expected lead variables, returns the infilled timeseries.

n_processes class-attribute instance-attribute #

n_processes: int | None = None

Number of processes to use for parallel processing.

Set to None to process in serial.

progress class-attribute instance-attribute #

progress: bool = True

Should progress bars be shown for each operation?

run_checks class-attribute instance-attribute #

run_checks: bool = True

If True, run checks on both input and output data

If you are sure about your workflow, you can disable the checks to speed things up (but we don't recommend this unless you really are confident about what you're doing).

__call__ #

__call__(in_emissions: DataFrame) -> DataFrame

Infill

Parameters:

Name Type Description Default
in_emissions DataFrame

Emissions to infill

required

Returns:

Type Description
DataFrame

Infilled emissions

Source code in src/gcages/ar6/infilling.py
def __call__(self, in_emissions: pd.DataFrame) -> pd.DataFrame:
    """
    Infill

    Parameters
    ----------
    in_emissions
        Emissions to infill

    Returns
    -------
    :
        Infilled emissions
    """
    if self.run_checks:
        assert_index_is_multiindex(in_emissions)
        assert_data_is_all_numeric(in_emissions)
        # Needed for parallelisation
        assert_has_index_levels(
            in_emissions, ["variable", "unit", "model", "scenario"]
        )

    # Strip off any prefixes that might be there
    to_infill = in_emissions.pix.assign(
        variable=in_emissions.index.get_level_values("variable").map(
            lambda x: x.replace("AR6 climate diagnostics|Harmonized|", "")
        )
    ).sort_index(axis="columns")

    # This is not the most efficient parallelisation.
    # It would be better to group by variable,
    # then have an `infill_variable` function,
    # but this would require a bit more thinking
    # to actually achieve.
    infilled = pd.concat(
        [
            v.reorder_levels(to_infill.index.names)
            for v in apply_op_parallel_progress(
                func_to_call=infill_scenario,
                iterable_input=(
                    gdf for _, gdf in to_infill.groupby(["model", "scenario"])
                ),
                parallel_op_config=ParallelOpConfig.from_user_facing(
                    progress=self.progress,
                    max_workers=self.n_processes,
                    progress_results_kwargs=dict(desc="Scenarios to infill"),
                ),
                infillers=self.infillers,
            )
        ]
    )

    if self.run_checks:
        pd.testing.assert_index_equal(infilled.columns, in_emissions.columns)

        if self.historical_emissions is None:
            msg = "`self.historical_emissions` must be set to check the infilling"
            raise AssertionError(msg)

        if self.harmonisation_year is None:
            msg = "`self.harmonisation_year` must be set to check the infilling"
            raise AssertionError(msg)

        assert_harmonised(
            infilled,
            history=self.historical_emissions,
            harmonisation_time=self.harmonisation_year,
            rounding=5,  # level of data storage in historical data often
        )
        assert_all_groups_are_complete(
            # The combo of the input and infilled should be complete
            pd.concat(
                [in_emissions, infilled.reorder_levels(in_emissions.index.names)]
            ),
            complete_index=self.historical_emissions.index.droplevel("unit"),
        )

    return infilled

from_ar6_config classmethod #

from_ar6_config(
    ar6_infilling_db_file: Path,
    ar6_infilling_db_cfcs_file: Path,
    variables_to_infill: Iterable[str] | None = None,
    run_checks: bool = True,
    historical_emissions: DataFrame | None = None,
    harmonisation_year: int | None = None,
    progress: bool = True,
    n_processes: int | None = None,
) -> AR6Infiller

Initialise from the config used in AR6

Parameters:

Name Type Description Default
ar6_infilling_db_file Path

File containing the AR6 infilling database

This is for all emissions except CFCs.

required
ar6_infilling_db_cfcs_file Path

File containing the AR6 infilling database for CFCs

required
variables_to_infill Iterable[str] | None

Variables to infill.

If not supplied, we use the default set from AR6.

None
run_checks bool

Should checks of the input and output data be performed?

If this is turned off, things are faster, but error messages are much less clear if things go wrong.

True
historical_emissions DataFrame | None

Historical emissions used for harmonisation

Only required if run_checks is True to check that the infilled data is also harmonised.

None
harmonisation_year int | None

Year in which the data was harmonised

Only required if run_checks is True to check that the infilled data is also harmonised.

None
progress bool

Should a progress bar be shown for each operation?

True
n_processes int | None

Number of processes to use for parallel processing.

Set to None to process in serial.

None

Returns:

Type Description
AR6Infiller

Initialised harmoniser

Source code in src/gcages/ar6/infilling.py
@classmethod
def from_ar6_config(  # noqa: PLR0913
    cls,
    ar6_infilling_db_file: Path,
    ar6_infilling_db_cfcs_file: Path,
    variables_to_infill: Iterable[str] | None = None,
    run_checks: bool = True,
    historical_emissions: pd.DataFrame | None = None,
    harmonisation_year: int | None = None,
    progress: bool = True,
    n_processes: int | None = None,  # better off in serial with silicone
) -> AR6Infiller:
    """
    Initialise from the config used in AR6

    Parameters
    ----------
    ar6_infilling_db_file
        File containing the AR6 infilling database

        This is for all emissions except CFCs.

    ar6_infilling_db_cfcs_file
        File containing the AR6 infilling database for CFCs

    variables_to_infill
        Variables to infill.

        If not supplied, we use the default set from AR6.

    run_checks
        Should checks of the input and output data be performed?

        If this is turned off, things are faster,
        but error messages are much less clear if things go wrong.

    historical_emissions
        Historical emissions used for harmonisation

        Only required if `run_checks` is `True` to check
        that the infilled data is also harmonised.

    harmonisation_year
        Year in which the data was harmonised

        Only required if `run_checks` is `True` to check
        that the infilled data is also harmonised.

    progress
        Should a progress bar be shown for each operation?

    n_processes
        Number of processes to use for parallel processing.

        Set to `None` to process in serial.

    Returns
    -------
    :
        Initialised harmoniser
    """
    try:
        import silicone.database_crunchers  # type: ignore # silicone has no type hints
    except ImportError as exc:
        raise MissingOptionalDependencyError(
            "get_ar6_infiller", requirement="silicone"
        ) from exc

    VARS_DB_CRUNCHERS = {
        "Emissions|BC": (
            False,
            silicone.database_crunchers.QuantileRollingWindows,
        ),
        "Emissions|CH4": (
            False,
            silicone.database_crunchers.QuantileRollingWindows,
        ),
        "Emissions|CO2|Biosphere": (
            False,
            silicone.database_crunchers.QuantileRollingWindows,
        ),
        "Emissions|CO2|Fossil": (
            False,
            silicone.database_crunchers.QuantileRollingWindows,
        ),
        "Emissions|CO": (
            False,
            silicone.database_crunchers.QuantileRollingWindows,
        ),
        "Emissions|N2O": (
            False,
            silicone.database_crunchers.QuantileRollingWindows,
        ),
        "Emissions|NH3": (
            False,
            silicone.database_crunchers.QuantileRollingWindows,
        ),
        "Emissions|NOx": (
            False,
            silicone.database_crunchers.QuantileRollingWindows,
        ),
        "Emissions|OC": (
            False,
            silicone.database_crunchers.QuantileRollingWindows,
        ),
        "Emissions|SOx": (
            False,
            silicone.database_crunchers.QuantileRollingWindows,
        ),
        "Emissions|NMVOC": (
            False,
            silicone.database_crunchers.QuantileRollingWindows,
        ),
        "Emissions|HFC134a": (
            False,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|HFC143a": (
            False,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|HFC227ea": (
            False,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|HFC23": (
            False,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|HFC32": (
            False,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|HFC4310mee": (
            False,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|HFC125": (
            False,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|SF6": (
            False,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|CF4": (
            False,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|C2F6": (
            False,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|C6F14": (
            False,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|CCl4": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|CFC11": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|CFC113": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|CFC114": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|CFC115": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|CFC12": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|CH2Cl2": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|CH3Br": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|CH3CCl3": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|CH3Cl": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|CHCl3": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|HCFC141b": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|HCFC142b": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|HCFC22": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|HFC152a": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|HFC236fa": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|HFC365mfc": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|Halon1202": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|Halon1211": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|Halon1301": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|Halon2402": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|NF3": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|C3F8": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|C4F10": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|C5F12": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|C7F16": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|C8F18": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|cC4F8": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
        "Emissions|SO2F2": (
            True,
            silicone.database_crunchers.RMSClosest,
        ),
    }
    """
    Definition of our default set of variables to infill and how to infill them

    Each key is a variable we can infill.
    Each value is a tuple with the following:

    - `False` if we should use the 'full' infilling database,
      `True` if we should use the database
      that has information about CFCs and other species
      not typically modelled by IAMs
    - The database cruncher from silicone to use for infilling the variable
    """

    if variables_to_infill is None:
        # Technically, having to infill some of these
        # would have caused a scenario to fail vetting,
        # but we could at least in theory infill all of them.
        variables_to_infill = tuple(VARS_DB_CRUNCHERS.keys())

    lead_options = (
        ("Emissions|CO2",),
        ("Emissions|CO2|Fossil",),
    )

    infillers = {}

    for v_infill in variables_to_infill:
        cfcs, cruncher = VARS_DB_CRUNCHERS[v_infill]

        infillers[v_infill] = partial(
            do_ar6_like_infilling,
            follower=v_infill,
            lead_options=lead_options,
            db_file=ar6_infilling_db_file
            if not cfcs
            else ar6_infilling_db_cfcs_file,
            cruncher=cruncher,
            cfcs=cfcs,
        )

    # CO2 Energy and Industrial special case
    infillers["Emissions|CO2|Fossil"] = partial(
        do_ar6_like_infilling,
        follower="Emissions|CO2|Fossil",
        lead_options=(("Emissions|CO2",),),
        db_file=ar6_infilling_db_file,
        cruncher=VARS_DB_CRUNCHERS["Emissions|CO2|Fossil"][1],
        cfcs=False,
    )

    return cls(
        infillers=infillers,
        run_checks=run_checks,
        historical_emissions=historical_emissions,
        harmonisation_year=harmonisation_year,
        progress=progress,
        n_processes=n_processes,
    )

AR6PostProcessor #

Post-processor that follows the same logic as was used in AR6

If you want exactly the same behaviour as in AR6, initialise using from_ar6_config

Methods:

Name Description
__call__

Do the post-processing

from_ar6_config

Initialise from the config used in AR6

Attributes:

Name Type Description
assessed_gsat_variable str

Name of the output variable that will contain temperature output

exceedance_thresholds_of_interest tuple[float, ...]

Thresholds of interest for calculating exceedance probabilities

gsat_assessment_median float

Median of the GSAT assessment

gsat_assessment_pre_industrial_period tuple[int, ...]

Pre-industrial time period used for the GSAT assessment

gsat_assessment_time_period tuple[int, ...]

Time period over which the GSAT assessment applies

n_processes int | None

Number of processes to use for parallel processing.

progress bool

Should progress bars be shown for each operation where they make sense?

quantiles_of_interest tuple[float, ...]

Quantiles to include in output

raw_gsat_variable_in str

Name of the variable that contains raw temperature output in the input

run_checks bool

If True, run checks on both input and output data

Source code in src/gcages/ar6/post_processing.py
@define
class AR6PostProcessor:
    """
    Post-processor that follows the same logic as was used in AR6

    If you want exactly the same behaviour as in AR6,
    initialise using [`from_ar6_config`][(c)]
    """

    gsat_assessment_median: float
    """
    Median of the GSAT assessment
    """

    gsat_assessment_time_period: tuple[int, ...]
    """
    Time period over which the GSAT assessment applies
    """

    gsat_assessment_pre_industrial_period: tuple[int, ...]
    """
    Pre-industrial time period used for the GSAT assessment
    """

    quantiles_of_interest: tuple[float, ...]
    """
    Quantiles to include in output
    """

    exceedance_thresholds_of_interest: tuple[float, ...]
    """
    Thresholds of interest for calculating exceedance probabilities
    """

    raw_gsat_variable_in: str
    """
    Name of the variable that contains raw temperature output in the input

    The temperature output should be global-mean surface air temperature (GSAT).
    """

    assessed_gsat_variable: str
    """
    Name of the output variable that will contain temperature output

    This temperature output is in line with the (AR6) assessed historical warming.
    """

    run_checks: bool = True
    """
    If `True`, run checks on both input and output data

    If you are sure about your workflow,
    you can disable the checks to speed things up
    (but we don't recommend this unless you really
    are confident about what you're doing).
    """

    progress: bool = True
    """
    Should progress bars be shown for each operation where they make sense?
    """

    n_processes: int | None = multiprocessing.cpu_count()
    """
    Number of processes to use for parallel processing.

    Set to `None` to process in serial.
    """

    def __call__(self, in_df: pd.DataFrame) -> PostProcessingResult:
        """
        Do the post-processing

        Parameters
        ----------
        in_df
            Data to post-process

        Returns
        -------
        timeseries, metadata :
            Post-processed results

            These are both timeseries as well as scenario-level metadata.
        """
        if self.run_checks:
            assert_index_is_multiindex(in_df)
            assert_has_index_levels(
                in_df, ["variable", "unit", "model", "scenario", "climate_model"]
            )
            assert_data_is_all_numeric(in_df)
            assert_has_data_for_times(
                in_df, name="in_df", times=[2100], allow_nan=False
            )

            if self.raw_gsat_variable_in not in in_df.index.get_level_values(
                "variable"
            ):
                msg = (
                    f"{self.raw_gsat_variable_in} must be provided. "
                    f"Received: {in_df.index.get_level_values('variable')=}"
                )
                raise AssertionError(msg)

        temperatures_in_line_with_assessment = update_index_levels_func(
            get_temperatures_in_line_with_assessment(
                in_df.loc[
                    in_df.index.get_level_values("variable").isin(
                        [self.raw_gsat_variable_in]
                    )
                ],
                assessment_median=self.gsat_assessment_median,
                assessment_time_period=self.gsat_assessment_time_period,
                assessment_pre_industrial_period=self.gsat_assessment_pre_industrial_period,
                group_cols=["climate_model", "model", "scenario"],
            ),
            {"variable": lambda x: self.assessed_gsat_variable},
        )
        temperatures_in_line_with_assessment_quantiles = (
            fix_index_name_after_groupby_quantile(
                groupby_except(
                    temperatures_in_line_with_assessment,
                    "run_id",
                ).quantile(self.quantiles_of_interest),  # type: ignore # pandas-stubs confused
                new_name="quantile",
            )
        )
        exceedance_probabilities_over_time = get_exceedance_probabilities_over_time(
            temperatures_in_line_with_assessment,
            exceedance_thresholds_of_interest=self.exceedance_thresholds_of_interest,
            group_cols=["model", "scenario", "climate_model"],
            unit_col="unit",
            groupby_except_levels="run_id",
        )

        # TODO: move pandas-openscm.max to pandas-openscm
        peak_warming = set_index_levels_func(
            temperatures_in_line_with_assessment.max(axis="columns"), {"metric": "max"}
        )
        peak_warming_quantiles: pd.Series[float] = (
            fix_index_name_after_groupby_quantile(
                groupby_except(peak_warming, "run_id").quantile(
                    self.quantiles_of_interest  # type: ignore # pandas-stubs confused
                ),
                new_name="quantile",
            )
        )

        eoc_warming = set_index_levels_func(
            temperatures_in_line_with_assessment[2100], {"metric": 2100}
        )
        eoc_warming_quantiles: pd.Series[float] = fix_index_name_after_groupby_quantile(
            groupby_except(eoc_warming, "run_id").quantile(self.quantiles_of_interest),  # type: ignore # pandas-stubs confused
            new_name="quantile",
        )
        peak_warming_year = set_index_levels_func(
            update_index_levels_func(
                temperatures_in_line_with_assessment.idxmax(axis="columns"),
                {"unit": lambda x: "yr"},
            ),
            {"metric": "max_year"},
        )
        peak_warming_year_quantiles = fix_index_name_after_groupby_quantile(
            groupby_except(peak_warming_year, "run_id").quantile(
                self.quantiles_of_interest  # type: ignore # pandas-stubs out of date
            ),
            new_name="quantile",
        )

        exceedance_probabilities = get_exceedance_probabilities(
            temperatures_in_line_with_assessment,
            exceedance_thresholds_of_interest=self.exceedance_thresholds_of_interest,
            group_cols=["model", "scenario", "climate_model"],
            unit_col="unit",
            groupby_except_levels="run_id",
        )

        categories = categorise_scenarios(
            peak_warming_quantiles=peak_warming_quantiles,
            eoc_warming_quantiles=eoc_warming_quantiles,
            group_levels=["climate_model", "model", "scenario"],
            quantile_level="quantile",
        )

        timeseries_run_id = pd.concat([temperatures_in_line_with_assessment])
        timeseries_quantile = pd.concat(
            [temperatures_in_line_with_assessment_quantiles]
        )
        timeseries_exceedance_probabilities = pd.concat(
            [exceedance_probabilities_over_time]
        )

        metadata_run_id: pd.Series[float] = pd.concat(
            [peak_warming, eoc_warming, peak_warming_year]
        )
        metadata_quantile: pd.Series[float] = pd.concat(
            [
                peak_warming_quantiles,
                eoc_warming_quantiles,
                peak_warming_year_quantiles,
            ]
        )
        metadata_exceedance_probabilities = exceedance_probabilities
        metadata_categories = categories

        res = PostProcessingResult(
            timeseries_run_id=timeseries_run_id,
            timeseries_quantile=timeseries_quantile,
            timeseries_exceedance_probabilities=timeseries_exceedance_probabilities,
            metadata_run_id=metadata_run_id,
            metadata_quantile=metadata_quantile,
            metadata_exceedance_probabilities=metadata_exceedance_probabilities,
            metadata_categories=metadata_categories,
        )

        if self.run_checks:
            comparison_levels = ["model", "scenario", "climate_model"]
            for attr in [
                "timeseries_run_id",
                "timeseries_quantile",
                "timeseries_exceedance_probabilities",
                "metadata_run_id",
                "metadata_quantile",
                "metadata_exceedance_probabilities",
                "metadata_categories",
            ]:
                pd.testing.assert_index_equal(
                    getattr(res, attr)
                    .index.droplevel(
                        getattr(res, attr).index.names.difference(comparison_levels)
                    )
                    .drop_duplicates()
                    .reorder_levels(comparison_levels),
                    in_df.index.droplevel(
                        in_df.index.names.difference(comparison_levels)  # type: ignore # pandas-stubs out of date
                    )
                    .drop_duplicates()
                    .reorder_levels(comparison_levels),
                    check_order=False,
                )

        return res

    @classmethod
    def from_ar6_config(  # noqa: PLR0913
        cls,
        exceedance_thresholds_of_interest: tuple[float, ...] = tuple(
            np.arange(1.0, 4.01, 0.5)
        ),
        quantiles_of_interest: tuple[float, ...] = (
            0.05,
            0.10,
            1.0 / 6.0,
            0.33,
            0.50,
            0.67,
            5.0 / 6.0,
            0.90,
            0.95,
        ),
        raw_gsat_variable_in: str = "Surface Air Temperature Change",
        assessed_gsat_variable: str = "Surface Temperature (GSAT)",
        run_checks: bool = True,
        progress: bool = True,
        n_processes: int | None = multiprocessing.cpu_count(),
    ) -> AR6PostProcessor:
        """
        Initialise from the config used in AR6

        Parameters
        ----------
        exceedance_thresholds_of_interest
            The thresholds for which we are interested in exceedance probabilities

        quantiles_of_interest
            The quantiles we want to include in the results

        raw_gsat_variable_in
            Name of the variable that contains raw temperature output in the input

            The temperature output should be global-mean surface air temperature (GSAT).

        assessed_gsat_variable
            Name of the output variable that will contain temperature output

            This temperature output is in line with the
            (AR6) assessed historical warming.

        run_checks
            Should checks of the input and output data be performed?

            If this is turned off, things are faster,
            but error messages are much less clear if things go wrong.

        progress
            Should progress bars be shown for each operation?

        n_processes
            Number of processes to use for parallel processing.

            Set to 1 to process in serial.

        Returns
        -------
        :
            Initialised post-processor
        """
        if not all(q in quantiles_of_interest for q in [0.50, 0.33]):
            msg = (
                "quantiles_of_interest must contain 0.50 and 0.33 "
                "for the categorisation to work, "
                f"received {quantiles_of_interest=}"
            )
            raise AssertionError(msg)

        return cls(
            raw_gsat_variable_in=raw_gsat_variable_in,
            assessed_gsat_variable=assessed_gsat_variable,
            gsat_assessment_median=0.85,
            gsat_assessment_time_period=tuple(range(1995, 2014 + 1)),
            gsat_assessment_pre_industrial_period=tuple(range(1850, 1900 + 1)),
            quantiles_of_interest=quantiles_of_interest,
            exceedance_thresholds_of_interest=exceedance_thresholds_of_interest,
            run_checks=run_checks,
            n_processes=n_processes,
        )

assessed_gsat_variable instance-attribute #

assessed_gsat_variable: str

Name of the output variable that will contain temperature output

This temperature output is in line with the (AR6) assessed historical warming.

exceedance_thresholds_of_interest instance-attribute #

exceedance_thresholds_of_interest: tuple[float, ...]

Thresholds of interest for calculating exceedance probabilities

gsat_assessment_median instance-attribute #

gsat_assessment_median: float

Median of the GSAT assessment

gsat_assessment_pre_industrial_period instance-attribute #

gsat_assessment_pre_industrial_period: tuple[int, ...]

Pre-industrial time period used for the GSAT assessment

gsat_assessment_time_period instance-attribute #

gsat_assessment_time_period: tuple[int, ...]

Time period over which the GSAT assessment applies

n_processes class-attribute instance-attribute #

n_processes: int | None = cpu_count()

Number of processes to use for parallel processing.

Set to None to process in serial.

progress class-attribute instance-attribute #

progress: bool = True

Should progress bars be shown for each operation where they make sense?

quantiles_of_interest instance-attribute #

quantiles_of_interest: tuple[float, ...]

Quantiles to include in output

raw_gsat_variable_in instance-attribute #

raw_gsat_variable_in: str

Name of the variable that contains raw temperature output in the input

The temperature output should be global-mean surface air temperature (GSAT).

run_checks class-attribute instance-attribute #

run_checks: bool = True

If True, run checks on both input and output data

If you are sure about your workflow, you can disable the checks to speed things up (but we don't recommend this unless you really are confident about what you're doing).

__call__ #

__call__(in_df: DataFrame) -> PostProcessingResult

Do the post-processing

Parameters:

Name Type Description Default
in_df DataFrame

Data to post-process

required

Returns:

Type Description
timeseries, metadata :

Post-processed results

These are both timeseries as well as scenario-level metadata.

Source code in src/gcages/ar6/post_processing.py
def __call__(self, in_df: pd.DataFrame) -> PostProcessingResult:
    """
    Do the post-processing

    Parameters
    ----------
    in_df
        Data to post-process

    Returns
    -------
    timeseries, metadata :
        Post-processed results

        These are both timeseries as well as scenario-level metadata.
    """
    if self.run_checks:
        assert_index_is_multiindex(in_df)
        assert_has_index_levels(
            in_df, ["variable", "unit", "model", "scenario", "climate_model"]
        )
        assert_data_is_all_numeric(in_df)
        assert_has_data_for_times(
            in_df, name="in_df", times=[2100], allow_nan=False
        )

        if self.raw_gsat_variable_in not in in_df.index.get_level_values(
            "variable"
        ):
            msg = (
                f"{self.raw_gsat_variable_in} must be provided. "
                f"Received: {in_df.index.get_level_values('variable')=}"
            )
            raise AssertionError(msg)

    temperatures_in_line_with_assessment = update_index_levels_func(
        get_temperatures_in_line_with_assessment(
            in_df.loc[
                in_df.index.get_level_values("variable").isin(
                    [self.raw_gsat_variable_in]
                )
            ],
            assessment_median=self.gsat_assessment_median,
            assessment_time_period=self.gsat_assessment_time_period,
            assessment_pre_industrial_period=self.gsat_assessment_pre_industrial_period,
            group_cols=["climate_model", "model", "scenario"],
        ),
        {"variable": lambda x: self.assessed_gsat_variable},
    )
    temperatures_in_line_with_assessment_quantiles = (
        fix_index_name_after_groupby_quantile(
            groupby_except(
                temperatures_in_line_with_assessment,
                "run_id",
            ).quantile(self.quantiles_of_interest),  # type: ignore # pandas-stubs confused
            new_name="quantile",
        )
    )
    exceedance_probabilities_over_time = get_exceedance_probabilities_over_time(
        temperatures_in_line_with_assessment,
        exceedance_thresholds_of_interest=self.exceedance_thresholds_of_interest,
        group_cols=["model", "scenario", "climate_model"],
        unit_col="unit",
        groupby_except_levels="run_id",
    )

    # TODO: move pandas-openscm.max to pandas-openscm
    peak_warming = set_index_levels_func(
        temperatures_in_line_with_assessment.max(axis="columns"), {"metric": "max"}
    )
    peak_warming_quantiles: pd.Series[float] = (
        fix_index_name_after_groupby_quantile(
            groupby_except(peak_warming, "run_id").quantile(
                self.quantiles_of_interest  # type: ignore # pandas-stubs confused
            ),
            new_name="quantile",
        )
    )

    eoc_warming = set_index_levels_func(
        temperatures_in_line_with_assessment[2100], {"metric": 2100}
    )
    eoc_warming_quantiles: pd.Series[float] = fix_index_name_after_groupby_quantile(
        groupby_except(eoc_warming, "run_id").quantile(self.quantiles_of_interest),  # type: ignore # pandas-stubs confused
        new_name="quantile",
    )
    peak_warming_year = set_index_levels_func(
        update_index_levels_func(
            temperatures_in_line_with_assessment.idxmax(axis="columns"),
            {"unit": lambda x: "yr"},
        ),
        {"metric": "max_year"},
    )
    peak_warming_year_quantiles = fix_index_name_after_groupby_quantile(
        groupby_except(peak_warming_year, "run_id").quantile(
            self.quantiles_of_interest  # type: ignore # pandas-stubs out of date
        ),
        new_name="quantile",
    )

    exceedance_probabilities = get_exceedance_probabilities(
        temperatures_in_line_with_assessment,
        exceedance_thresholds_of_interest=self.exceedance_thresholds_of_interest,
        group_cols=["model", "scenario", "climate_model"],
        unit_col="unit",
        groupby_except_levels="run_id",
    )

    categories = categorise_scenarios(
        peak_warming_quantiles=peak_warming_quantiles,
        eoc_warming_quantiles=eoc_warming_quantiles,
        group_levels=["climate_model", "model", "scenario"],
        quantile_level="quantile",
    )

    timeseries_run_id = pd.concat([temperatures_in_line_with_assessment])
    timeseries_quantile = pd.concat(
        [temperatures_in_line_with_assessment_quantiles]
    )
    timeseries_exceedance_probabilities = pd.concat(
        [exceedance_probabilities_over_time]
    )

    metadata_run_id: pd.Series[float] = pd.concat(
        [peak_warming, eoc_warming, peak_warming_year]
    )
    metadata_quantile: pd.Series[float] = pd.concat(
        [
            peak_warming_quantiles,
            eoc_warming_quantiles,
            peak_warming_year_quantiles,
        ]
    )
    metadata_exceedance_probabilities = exceedance_probabilities
    metadata_categories = categories

    res = PostProcessingResult(
        timeseries_run_id=timeseries_run_id,
        timeseries_quantile=timeseries_quantile,
        timeseries_exceedance_probabilities=timeseries_exceedance_probabilities,
        metadata_run_id=metadata_run_id,
        metadata_quantile=metadata_quantile,
        metadata_exceedance_probabilities=metadata_exceedance_probabilities,
        metadata_categories=metadata_categories,
    )

    if self.run_checks:
        comparison_levels = ["model", "scenario", "climate_model"]
        for attr in [
            "timeseries_run_id",
            "timeseries_quantile",
            "timeseries_exceedance_probabilities",
            "metadata_run_id",
            "metadata_quantile",
            "metadata_exceedance_probabilities",
            "metadata_categories",
        ]:
            pd.testing.assert_index_equal(
                getattr(res, attr)
                .index.droplevel(
                    getattr(res, attr).index.names.difference(comparison_levels)
                )
                .drop_duplicates()
                .reorder_levels(comparison_levels),
                in_df.index.droplevel(
                    in_df.index.names.difference(comparison_levels)  # type: ignore # pandas-stubs out of date
                )
                .drop_duplicates()
                .reorder_levels(comparison_levels),
                check_order=False,
            )

    return res

from_ar6_config classmethod #

from_ar6_config(
    exceedance_thresholds_of_interest: tuple[
        float, ...
    ] = tuple(arange(1.0, 4.01, 0.5)),
    quantiles_of_interest: tuple[float, ...] = (
        0.05,
        0.1,
        1.0 / 6.0,
        0.33,
        0.5,
        0.67,
        5.0 / 6.0,
        0.9,
        0.95,
    ),
    raw_gsat_variable_in: str = "Surface Air Temperature Change",
    assessed_gsat_variable: str = "Surface Temperature (GSAT)",
    run_checks: bool = True,
    progress: bool = True,
    n_processes: int | None = cpu_count(),
) -> AR6PostProcessor

Initialise from the config used in AR6

Parameters:

Name Type Description Default
exceedance_thresholds_of_interest tuple[float, ...]

The thresholds for which we are interested in exceedance probabilities

tuple(arange(1.0, 4.01, 0.5))
quantiles_of_interest tuple[float, ...]

The quantiles we want to include in the results

(0.05, 0.1, 1.0 / 6.0, 0.33, 0.5, 0.67, 5.0 / 6.0, 0.9, 0.95)
raw_gsat_variable_in str

Name of the variable that contains raw temperature output in the input

The temperature output should be global-mean surface air temperature (GSAT).

'Surface Air Temperature Change'
assessed_gsat_variable str

Name of the output variable that will contain temperature output

This temperature output is in line with the (AR6) assessed historical warming.

'Surface Temperature (GSAT)'
run_checks bool

Should checks of the input and output data be performed?

If this is turned off, things are faster, but error messages are much less clear if things go wrong.

True
progress bool

Should progress bars be shown for each operation?

True
n_processes int | None

Number of processes to use for parallel processing.

Set to 1 to process in serial.

cpu_count()

Returns:

Type Description
AR6PostProcessor

Initialised post-processor

Source code in src/gcages/ar6/post_processing.py
@classmethod
def from_ar6_config(  # noqa: PLR0913
    cls,
    exceedance_thresholds_of_interest: tuple[float, ...] = tuple(
        np.arange(1.0, 4.01, 0.5)
    ),
    quantiles_of_interest: tuple[float, ...] = (
        0.05,
        0.10,
        1.0 / 6.0,
        0.33,
        0.50,
        0.67,
        5.0 / 6.0,
        0.90,
        0.95,
    ),
    raw_gsat_variable_in: str = "Surface Air Temperature Change",
    assessed_gsat_variable: str = "Surface Temperature (GSAT)",
    run_checks: bool = True,
    progress: bool = True,
    n_processes: int | None = multiprocessing.cpu_count(),
) -> AR6PostProcessor:
    """
    Initialise from the config used in AR6

    Parameters
    ----------
    exceedance_thresholds_of_interest
        The thresholds for which we are interested in exceedance probabilities

    quantiles_of_interest
        The quantiles we want to include in the results

    raw_gsat_variable_in
        Name of the variable that contains raw temperature output in the input

        The temperature output should be global-mean surface air temperature (GSAT).

    assessed_gsat_variable
        Name of the output variable that will contain temperature output

        This temperature output is in line with the
        (AR6) assessed historical warming.

    run_checks
        Should checks of the input and output data be performed?

        If this is turned off, things are faster,
        but error messages are much less clear if things go wrong.

    progress
        Should progress bars be shown for each operation?

    n_processes
        Number of processes to use for parallel processing.

        Set to 1 to process in serial.

    Returns
    -------
    :
        Initialised post-processor
    """
    if not all(q in quantiles_of_interest for q in [0.50, 0.33]):
        msg = (
            "quantiles_of_interest must contain 0.50 and 0.33 "
            "for the categorisation to work, "
            f"received {quantiles_of_interest=}"
        )
        raise AssertionError(msg)

    return cls(
        raw_gsat_variable_in=raw_gsat_variable_in,
        assessed_gsat_variable=assessed_gsat_variable,
        gsat_assessment_median=0.85,
        gsat_assessment_time_period=tuple(range(1995, 2014 + 1)),
        gsat_assessment_pre_industrial_period=tuple(range(1850, 1900 + 1)),
        quantiles_of_interest=quantiles_of_interest,
        exceedance_thresholds_of_interest=exceedance_thresholds_of_interest,
        run_checks=run_checks,
        n_processes=n_processes,
    )

AR6PreProcessor #

Pre-processor that follows the same logic as was used in AR6

If you want exactly the same behaviour as in AR6, initialise using from_ar6_config

Methods:

Name Description
__call__

Pre-process

from_ar6_config

Initialise from config that was used in AR6

Attributes:

Name Type Description
conditional_removals tuple[tuple[str, tuple[str, ...]], ...] | None

Specification for variables that can be removed if other variables are present

conditional_sums tuple[tuple[str, tuple[str, ...]], ...] | None

Specification for variables that can be created from other variables

drop_if_identical tuple[tuple[str, str], ...] | None

Variables that can be dropped if they are idential to another variable

emissions_out tuple[str, ...]

Names of emissions that can be included in the result of pre-processing

n_processes int | None

Number of processes to use for parallel processing.

negative_value_not_small_threshold float

Threshold which defines when a negative value is not small

progress bool

Should progress bars be shown for each operation?

reclassifications Mapping[str, tuple[str, ...]] | None

Variables that should be reclassified as being part of another variable

run_checks bool

If True, run checks on both input and output data

Source code in src/gcages/ar6/pre_processing.py
@define
class AR6PreProcessor:
    """
    Pre-processor that follows the same logic as was used in AR6

    If you want exactly the same behaviour as in AR6,
    initialise using [`from_ar6_config`][(c)]
    """

    emissions_out: tuple[str, ...]
    """
    Names of emissions that can be included in the result of pre-processing

    Not all these emissions need to be there,
    but any names which are not in this list will be removed as part of pre-processing.
    """

    negative_value_not_small_threshold: float
    """
    Threshold which defines when a negative value is not small

    Non-CO2 emissions less than this that are negative
    are not automatically set to zero.
    """

    conditional_sums: tuple[tuple[str, tuple[str, ...]], ...] | None = None
    """
    Specification for variables that can be created from other variables

    Form:

    ```python
    (
        (variable_that_can_be_created, (component_1, component_2)),
        ...
    )
    ```

    The variable that can be created is only created
    if all the variables it depends on are present.
    """

    reclassifications: Mapping[str, tuple[str, ...]] | None = None
    """
    Variables that should be reclassified as being part of another variable

    Form:

    ```python
    {
        variable_to_add_to: (variable_to_rename_1, variable_to_rename_2),
        ...
    }
    ```

    For example
    ```python
    {
        "Emissions|CO2|Energy and Industrial Processes": (
            "Emissions|CO2|Other",
            "Emissions|CO2|Waste",
        )
    }
    ```
    """

    conditional_removals: tuple[tuple[str, tuple[str, ...]], ...] | None = None
    """
    Specification for variables that can be removed if other variables are present

    Form:

    ```python
    (
        (variable_that_can_be_removed, (component_1, component_2)),
        ...
    )
    ```

    The variable that can be removed is only removed
    if all the variables it depends on are present.
    """

    drop_if_identical: tuple[tuple[str, str], ...] | None = None
    """
    Variables that can be dropped if they are idential to another variable

    Form:

    ```python
    (
        (variable_that_can_be_removed, variable_to_compare_to),
        ...
    )
    ```

    The variable that can be removed is only removed
    if its values are identical to the variable it is compared to.
    """

    run_checks: bool = True
    """
    If `True`, run checks on both input and output data

    If you are sure about your workflow,
    you can disable the checks to speed things up
    (but we don't recommend this unless you really
    are confident about what you're doing).
    """

    progress: bool = True
    """
    Should progress bars be shown for each operation?
    """

    n_processes: int | None = multiprocessing.cpu_count()
    """
    Number of processes to use for parallel processing.

    Set to `None` to process in serial.
    """

    def __call__(self, in_emissions: pd.DataFrame) -> pd.DataFrame:
        """
        Pre-process

        Parameters
        ----------
        in_emissions
            Emissions to pre-process

        Returns
        -------
        :
            Pre-processed emissions
        """
        try:
            from pandas_indexing.selectors import isin, ismatch
        except ImportError as exc:
            raise MissingOptionalDependencyError(
                "AR6PreProcessor.__call__", requirement="pandas_indexing"
            ) from exc

        if self.run_checks:
            assert_index_is_multiindex(in_emissions)
            assert_data_is_all_numeric(in_emissions)
            assert_has_index_levels(in_emissions, ["variable", "unit"])

        # Remove any rows with only zero (custom AR6 thing)
        in_emissions = in_emissions[
            ~(((in_emissions == 0.0) | in_emissions.isnull()).all(axis="columns"))
        ]

        rp = partial(
            run_parallel_pre_processing,
            progress=self.progress,
            n_processes=self.n_processes,
        )
        if self.conditional_sums is not None:
            in_emissions = rp(  # type: ignore
                in_emissions,
                func_to_call=add_conditional_sums,
                progress_bar_desc=(
                    "For each model-scenario, calculating conditional sums"
                ),
                conditional_sums=self.conditional_sums,
            )

        if self.reclassifications is not None:
            in_emissions = rp(  # type: ignore
                in_emissions,
                func_to_call=reclassify_variables,
                progress_bar_desc="For each model-scenario, reclassifying variables",
                reclassifications=self.reclassifications,
            )

        if self.conditional_removals is not None:
            in_emissions = rp(  # type: ignore
                in_emissions,
                func_to_call=condtionally_remove_variables,
                progress_bar_desc=(
                    "For each model-scenario, conditionally removing variables"
                ),
                conditional_removals=self.conditional_removals,
            )

        if self.drop_if_identical is not None:
            in_emissions = rp(  # type: ignore
                in_emissions,
                func_to_call=drop_variables_if_identical,
                progress_bar_desc=(
                    "For each model-scenario, dropping variables if they are identical"
                ),
                drop_if_identical=self.drop_if_identical,
            )

        # Negative value handling
        co2_locator = ismatch(variable="**CO2**")
        in_emissions.loc[~co2_locator] = in_emissions.loc[~co2_locator].where(
            # Where these conditions are true, keep the original data.
            (in_emissions.loc[~co2_locator] > 0)
            | (in_emissions.loc[~co2_locator] < self.negative_value_not_small_threshold)
            | in_emissions.loc[~co2_locator].isnull(),
            # Otherwise, set to zero
            other=0.0,
        )

        res: pd.DataFrame = in_emissions.loc[isin(variable=self.emissions_out)]

        # Strip out any units that won't play nice with pint
        res = strip_pint_incompatible_characters_from_units(
            res, units_index_level="unit"
        )

        # Convert to gcages naming conventions
        res = update_index_levels_func(
            res,
            {
                "variable": partial(
                    convert_variable_name,
                    from_convention=SupportedNamingConventions.IAMC,
                    to_convention=SupportedNamingConventions.GCAGES,
                )
            },
        )

        if self.run_checks:
            # AR6 required emissions for these years after pre-processing,
            # for some reason
            required_years = list(range(2020, 2100 + 1, 10))
            assert_has_data_for_times(
                res, name="res", times=required_years, allow_nan=False
            )

        return res

    @classmethod
    def from_ar6_config(
        cls,
        run_checks: bool = True,
        progress: bool = True,
        n_processes: int | None = multiprocessing.cpu_count(),
    ) -> AR6PreProcessor:
        """
        Initialise from config that was used in AR6

        Parameters
        ----------
        run_checks
            Should checks of the input and output data be performed?

            If this is turned off, things are faster,
            but error messages are much less clear if things go wrong.

        progress
            Should a progress bar be shown for each operation?

        n_processes
            Number of processes to use for parallel processing.

            Set to `None` to process in serial.

        Returns
        -------
        :
            Initialised Pre-processor
        """
        ar6_emissions_for_harmonisation_iamc = tuple(
            v
            for v in (
                "Emissions|BC",
                "Emissions|PFC|C2F6",
                "Emissions|PFC|C6F14",
                "Emissions|PFC|CF4",
                "Emissions|CO",
                "Emissions|CO2",
                "Emissions|CO2|AFOLU",
                "Emissions|CO2|Energy and Industrial Processes",
                "Emissions|CH4",
                # "Emissions|F-Gases",  # Not used
                # "Emissions|HFC",  # Not used
                "Emissions|HFC|HFC125",
                "Emissions|HFC|HFC134a",
                "Emissions|HFC|HFC143a",
                "Emissions|HFC|HFC227ea",
                "Emissions|HFC|HFC23",
                # 'Emissions|HFC|HFC245ca',  # all nan in historical dataset (RCMIP)
                # "Emissions|HFC|HFC245fa",  # not in historical dataset (RCMIP)
                "Emissions|HFC|HFC32",
                "Emissions|HFC|HFC43-10",
                "Emissions|N2O",
                "Emissions|NH3",
                "Emissions|NOx",
                "Emissions|OC",
                # "Emissions|PFC",  # Not used
                "Emissions|SF6",
                "Emissions|Sulfur",
                "Emissions|VOC",
            )
        )
        conditional_sums = (
            (
                "Emissions|CO2|Energy and Industrial Processes",
                (
                    "Emissions|CO2|Industrial Processes",
                    "Emissions|CO2|Energy",
                ),
            ),
        )
        reclassifications = {
            "Emissions|CO2|Energy and Industrial Processes": (
                "Emissions|CO2|Other",
                "Emissions|CO2|Waste",
            )
        }
        conditional_removals = (
            (
                "Emissions|CO2",
                (
                    "Emissions|CO2|Energy and Industrial Processes",
                    "Emissions|CO2|AFOLU",
                ),
            ),
        )
        drop_if_identical = (
            ("Emissions|CO2", "Emissions|CO2|Energy and Industrial Processes"),
            ("Emissions|CO2", "Emissions|CO2|AFOLU"),
        )

        return cls(
            emissions_out=ar6_emissions_for_harmonisation_iamc,
            negative_value_not_small_threshold=-0.1,
            conditional_sums=conditional_sums,
            reclassifications=reclassifications,
            conditional_removals=conditional_removals,
            drop_if_identical=drop_if_identical,
            run_checks=run_checks,
            n_processes=n_processes,
            progress=progress,
        )

conditional_removals class-attribute instance-attribute #

conditional_removals: (
    tuple[tuple[str, tuple[str, ...]], ...] | None
) = None

Specification for variables that can be removed if other variables are present

Form:

(
    (variable_that_can_be_removed, (component_1, component_2)),
    ...
)

The variable that can be removed is only removed if all the variables it depends on are present.

conditional_sums class-attribute instance-attribute #

conditional_sums: (
    tuple[tuple[str, tuple[str, ...]], ...] | None
) = None

Specification for variables that can be created from other variables

Form:

(
    (variable_that_can_be_created, (component_1, component_2)),
    ...
)

The variable that can be created is only created if all the variables it depends on are present.

drop_if_identical class-attribute instance-attribute #

drop_if_identical: tuple[tuple[str, str], ...] | None = None

Variables that can be dropped if they are idential to another variable

Form:

(
    (variable_that_can_be_removed, variable_to_compare_to),
    ...
)

The variable that can be removed is only removed if its values are identical to the variable it is compared to.

emissions_out instance-attribute #

emissions_out: tuple[str, ...]

Names of emissions that can be included in the result of pre-processing

Not all these emissions need to be there, but any names which are not in this list will be removed as part of pre-processing.

n_processes class-attribute instance-attribute #

n_processes: int | None = cpu_count()

Number of processes to use for parallel processing.

Set to None to process in serial.

negative_value_not_small_threshold instance-attribute #

negative_value_not_small_threshold: float

Threshold which defines when a negative value is not small

Non-CO2 emissions less than this that are negative are not automatically set to zero.

progress class-attribute instance-attribute #

progress: bool = True

Should progress bars be shown for each operation?

reclassifications class-attribute instance-attribute #

reclassifications: Mapping[str, tuple[str, ...]] | None = (
    None
)

Variables that should be reclassified as being part of another variable

Form:

{
    variable_to_add_to: (variable_to_rename_1, variable_to_rename_2),
    ...
}

For example

{
    "Emissions|CO2|Energy and Industrial Processes": (
        "Emissions|CO2|Other",
        "Emissions|CO2|Waste",
    )
}

run_checks class-attribute instance-attribute #

run_checks: bool = True

If True, run checks on both input and output data

If you are sure about your workflow, you can disable the checks to speed things up (but we don't recommend this unless you really are confident about what you're doing).

__call__ #

__call__(in_emissions: DataFrame) -> DataFrame

Pre-process

Parameters:

Name Type Description Default
in_emissions DataFrame

Emissions to pre-process

required

Returns:

Type Description
DataFrame

Pre-processed emissions

Source code in src/gcages/ar6/pre_processing.py
def __call__(self, in_emissions: pd.DataFrame) -> pd.DataFrame:
    """
    Pre-process

    Parameters
    ----------
    in_emissions
        Emissions to pre-process

    Returns
    -------
    :
        Pre-processed emissions
    """
    try:
        from pandas_indexing.selectors import isin, ismatch
    except ImportError as exc:
        raise MissingOptionalDependencyError(
            "AR6PreProcessor.__call__", requirement="pandas_indexing"
        ) from exc

    if self.run_checks:
        assert_index_is_multiindex(in_emissions)
        assert_data_is_all_numeric(in_emissions)
        assert_has_index_levels(in_emissions, ["variable", "unit"])

    # Remove any rows with only zero (custom AR6 thing)
    in_emissions = in_emissions[
        ~(((in_emissions == 0.0) | in_emissions.isnull()).all(axis="columns"))
    ]

    rp = partial(
        run_parallel_pre_processing,
        progress=self.progress,
        n_processes=self.n_processes,
    )
    if self.conditional_sums is not None:
        in_emissions = rp(  # type: ignore
            in_emissions,
            func_to_call=add_conditional_sums,
            progress_bar_desc=(
                "For each model-scenario, calculating conditional sums"
            ),
            conditional_sums=self.conditional_sums,
        )

    if self.reclassifications is not None:
        in_emissions = rp(  # type: ignore
            in_emissions,
            func_to_call=reclassify_variables,
            progress_bar_desc="For each model-scenario, reclassifying variables",
            reclassifications=self.reclassifications,
        )

    if self.conditional_removals is not None:
        in_emissions = rp(  # type: ignore
            in_emissions,
            func_to_call=condtionally_remove_variables,
            progress_bar_desc=(
                "For each model-scenario, conditionally removing variables"
            ),
            conditional_removals=self.conditional_removals,
        )

    if self.drop_if_identical is not None:
        in_emissions = rp(  # type: ignore
            in_emissions,
            func_to_call=drop_variables_if_identical,
            progress_bar_desc=(
                "For each model-scenario, dropping variables if they are identical"
            ),
            drop_if_identical=self.drop_if_identical,
        )

    # Negative value handling
    co2_locator = ismatch(variable="**CO2**")
    in_emissions.loc[~co2_locator] = in_emissions.loc[~co2_locator].where(
        # Where these conditions are true, keep the original data.
        (in_emissions.loc[~co2_locator] > 0)
        | (in_emissions.loc[~co2_locator] < self.negative_value_not_small_threshold)
        | in_emissions.loc[~co2_locator].isnull(),
        # Otherwise, set to zero
        other=0.0,
    )

    res: pd.DataFrame = in_emissions.loc[isin(variable=self.emissions_out)]

    # Strip out any units that won't play nice with pint
    res = strip_pint_incompatible_characters_from_units(
        res, units_index_level="unit"
    )

    # Convert to gcages naming conventions
    res = update_index_levels_func(
        res,
        {
            "variable": partial(
                convert_variable_name,
                from_convention=SupportedNamingConventions.IAMC,
                to_convention=SupportedNamingConventions.GCAGES,
            )
        },
    )

    if self.run_checks:
        # AR6 required emissions for these years after pre-processing,
        # for some reason
        required_years = list(range(2020, 2100 + 1, 10))
        assert_has_data_for_times(
            res, name="res", times=required_years, allow_nan=False
        )

    return res

from_ar6_config classmethod #

from_ar6_config(
    run_checks: bool = True,
    progress: bool = True,
    n_processes: int | None = cpu_count(),
) -> AR6PreProcessor

Initialise from config that was used in AR6

Parameters:

Name Type Description Default
run_checks bool

Should checks of the input and output data be performed?

If this is turned off, things are faster, but error messages are much less clear if things go wrong.

True
progress bool

Should a progress bar be shown for each operation?

True
n_processes int | None

Number of processes to use for parallel processing.

Set to None to process in serial.

cpu_count()

Returns:

Type Description
AR6PreProcessor

Initialised Pre-processor

Source code in src/gcages/ar6/pre_processing.py
@classmethod
def from_ar6_config(
    cls,
    run_checks: bool = True,
    progress: bool = True,
    n_processes: int | None = multiprocessing.cpu_count(),
) -> AR6PreProcessor:
    """
    Initialise from config that was used in AR6

    Parameters
    ----------
    run_checks
        Should checks of the input and output data be performed?

        If this is turned off, things are faster,
        but error messages are much less clear if things go wrong.

    progress
        Should a progress bar be shown for each operation?

    n_processes
        Number of processes to use for parallel processing.

        Set to `None` to process in serial.

    Returns
    -------
    :
        Initialised Pre-processor
    """
    ar6_emissions_for_harmonisation_iamc = tuple(
        v
        for v in (
            "Emissions|BC",
            "Emissions|PFC|C2F6",
            "Emissions|PFC|C6F14",
            "Emissions|PFC|CF4",
            "Emissions|CO",
            "Emissions|CO2",
            "Emissions|CO2|AFOLU",
            "Emissions|CO2|Energy and Industrial Processes",
            "Emissions|CH4",
            # "Emissions|F-Gases",  # Not used
            # "Emissions|HFC",  # Not used
            "Emissions|HFC|HFC125",
            "Emissions|HFC|HFC134a",
            "Emissions|HFC|HFC143a",
            "Emissions|HFC|HFC227ea",
            "Emissions|HFC|HFC23",
            # 'Emissions|HFC|HFC245ca',  # all nan in historical dataset (RCMIP)
            # "Emissions|HFC|HFC245fa",  # not in historical dataset (RCMIP)
            "Emissions|HFC|HFC32",
            "Emissions|HFC|HFC43-10",
            "Emissions|N2O",
            "Emissions|NH3",
            "Emissions|NOx",
            "Emissions|OC",
            # "Emissions|PFC",  # Not used
            "Emissions|SF6",
            "Emissions|Sulfur",
            "Emissions|VOC",
        )
    )
    conditional_sums = (
        (
            "Emissions|CO2|Energy and Industrial Processes",
            (
                "Emissions|CO2|Industrial Processes",
                "Emissions|CO2|Energy",
            ),
        ),
    )
    reclassifications = {
        "Emissions|CO2|Energy and Industrial Processes": (
            "Emissions|CO2|Other",
            "Emissions|CO2|Waste",
        )
    }
    conditional_removals = (
        (
            "Emissions|CO2",
            (
                "Emissions|CO2|Energy and Industrial Processes",
                "Emissions|CO2|AFOLU",
            ),
        ),
    )
    drop_if_identical = (
        ("Emissions|CO2", "Emissions|CO2|Energy and Industrial Processes"),
        ("Emissions|CO2", "Emissions|CO2|AFOLU"),
    )

    return cls(
        emissions_out=ar6_emissions_for_harmonisation_iamc,
        negative_value_not_small_threshold=-0.1,
        conditional_sums=conditional_sums,
        reclassifications=reclassifications,
        conditional_removals=conditional_removals,
        drop_if_identical=drop_if_identical,
        run_checks=run_checks,
        n_processes=n_processes,
        progress=progress,
    )

AR6SCMRunner #

Simple climate model runner that follows the same logic as was used in AR6

If you want exactly the same behaviour as in AR6, initialise using from_ar6_config

Methods:

Name Description
__call__

Run the simple climate model

from_ar6_config

Initialise from the config used in AR6

Attributes:

Name Type Description
batch_size_scenarios int | None

The number of scenarios to run at a time

climate_models_cfgs dict[str, list[dict[str, Any]]]

Climate models to run and the configuration to use with them

db OpenSCMDB | None

Database in which to store the output of the runs

force_interpolate_to_yearly bool

Should we interpolate scenarios we run to yearly steps before running the SCMs.

harmonisation_year int | None

Year in which the data was harmonised

historical_emissions DataFrame | None

Historical emissions used for harmonisation

n_processes int | None

Number of processes to use for parallel processing.

output_variables tuple[str, ...]

Variables to include in the output

progress bool

Should progress bars be shown for each operation?

res_column_type type

Type to cast the result's column type to

run_checks bool

If True, run checks on both input and output data

verbose bool

Should verbose messages be printed?

Source code in src/gcages/ar6/scm_running.py
@define
class AR6SCMRunner:
    """
    Simple climate model runner that follows the same logic as was used in AR6

    If you want exactly the same behaviour as in AR6,
    initialise using [`from_ar6_config`][(c)]
    """

    climate_models_cfgs: dict[str, list[dict[str, Any]]] = field(
        repr=lambda x: ", ".join(
            (
                f"{climate_model}: {len(cfgs)} configurations"
                for climate_model, cfgs in x.items()
            )
        )
    )
    """
    Climate models to run and the configuration to use with them
    """

    output_variables: tuple[str, ...]
    """
    Variables to include in the output
    """

    force_interpolate_to_yearly: bool = True
    """
    Should we interpolate scenarios we run to yearly steps before running the SCMs.
    """

    batch_size_scenarios: int | None = None
    """
    The number of scenarios to run at a time

    Smaller batch sizes use less memory, but take longer overall
    (all else being equal).

    If not supplied, all scenarios are run simultaneously.
    """

    db: OpenSCMDB | None = None
    """
    Database in which to store the output of the runs

    If not supplied, output of the runs is not stored.
    """

    res_column_type: type = int
    """
    Type to cast the result's column type to
    """

    historical_emissions: pd.DataFrame | None = None
    """
    Historical emissions used for harmonisation

    Only required if `run_checks` is `True` to check
    that the data to run is harmonised.
    """

    harmonisation_year: int | None = None
    """
    Year in which the data was harmonised

    Only required if `run_checks` is `True` to check
    that the data to run is harmonised.
    """

    verbose: bool = True
    """
    Should verbose messages be printed?

    This is a temporary hack while we think about how to handle logging
    """

    run_checks: bool = True
    """
    If `True`, run checks on both input and output data

    If you are sure about your workflow,
    you can disable the checks to speed things up
    (but we don't recommend this unless you really
    are confident about what you're doing).
    """

    progress: bool = True
    """
    Should progress bars be shown for each operation?
    """

    n_processes: int | None = multiprocessing.cpu_count()
    """
    Number of processes to use for parallel processing.

    Set to `None` to process in serial.
    """

    def __call__(
        self, in_emissions: pd.DataFrame, force_rerun: bool = False
    ) -> pd.DataFrame:
        """
        Run the simple climate model

        Parameters
        ----------
        in_emissions
            Emissions to run

        force_rerun
            Force scenarios to re-run (i.e. disable caching).

        Returns
        -------
        :
            Raw results from the simple climate model
        """
        if self.run_checks:
            assert_index_is_multiindex(in_emissions)
            assert_has_index_levels(
                in_emissions, ["variable", "unit", "model", "scenario"]
            )
            assert_has_no_pint_incompatible_characters(
                in_emissions.index.get_level_values("unit").unique()
            )
            assert_data_is_all_numeric(in_emissions)

            if self.historical_emissions is None:
                msg = "`self.historical_emissions` must be set to check the infilling"
                raise AssertionError(msg)

            if self.harmonisation_year is None:
                msg = "`self.harmonisation_year` must be set to check the infilling"
                raise AssertionError(msg)

            assert_has_data_for_times(
                in_emissions,
                name="in_emissions",
                times=[self.harmonisation_year, 2100],
                allow_nan=False,
            )

            assert_harmonised(
                in_emissions,
                history=self.historical_emissions,
                harmonisation_time=self.harmonisation_year,
                rounding=5,  # level of data storage in historical data often
            )
            assert_all_groups_are_complete(
                # The combo of the input and infilled should be complete
                in_emissions,
                complete_index=self.historical_emissions.index.droplevel("unit"),
            )

        openscm_runner_emissions = update_index_levels_func(
            in_emissions,
            {
                "variable": partial(
                    convert_variable_name,
                    from_convention=SupportedNamingConventions.GCAGES,
                    to_convention=SupportedNamingConventions.OPENSCM_RUNNER,
                )
            },
        )
        if self.force_interpolate_to_yearly:
            # TODO: put interpolate to annual steps in pandas-openscm
            # Interpolate to ensure no nans.
            for y in range(
                openscm_runner_emissions.columns.min(),
                openscm_runner_emissions.columns.max() + 1,
            ):
                if y not in openscm_runner_emissions:
                    openscm_runner_emissions[y] = np.nan

            openscm_runner_emissions = (
                openscm_runner_emissions.sort_index(axis="columns")
                .T.interpolate("index")
                .T
            )

        scm_results_maybe = run_scms(
            openscm_runner_emissions,
            climate_models_cfgs=self.climate_models_cfgs,
            output_variables=self.output_variables,
            scenario_group_levels=["model", "scenario"],
            n_processes=self.n_processes if self.n_processes is not None else 1,
            db=self.db,
            verbose=self.verbose,
            batch_size_scenarios=self.batch_size_scenarios,
            force_rerun=force_rerun,
        )

        if self.db is not None:
            # Results aren't kept in memory during running, so have to load them now.
            # User can use `run_scms` directly if they want to process differently.
            out_maybe = self.db.load()
            if out_maybe is None:
                raise TypeError(out_maybe)

            out: pd.DataFrame = out_maybe

        else:
            if scm_results_maybe is None:
                raise TypeError(scm_results_maybe)

            out = scm_results_maybe

        out.columns = out.columns.astype(self.res_column_type)

        if self.run_checks:
            # All scenarios have output
            pd.testing.assert_index_equal(
                out.index.droplevel(
                    out.index.names.difference(["model", "scenario"])  # type: ignore # pandas-stubs out of date
                ).drop_duplicates(),
                in_emissions.index.droplevel(
                    in_emissions.index.names.difference(["model", "scenario"])  # type: ignore # pandas-stubs out of date
                ).drop_duplicates(),
                check_order=False,
            )
            # Expected output is provided
            assert_all_groups_are_complete(
                out,
                complete_index=pd.MultiIndex.from_arrays(
                    [list(self.output_variables)], names=["variable"]
                ),
            )

        return out

    @classmethod
    def from_ar6_config(  # noqa: PLR0913
        cls,
        magicc_exe_path: Path,
        magicc_prob_distribution_path: Path,
        output_variables: tuple[str, ...] = DEFAULT_OUTPUT_VARIABLES,
        batch_size_scenarios: int | None = None,
        db: OpenSCMDB | None = None,
        historical_emissions: pd.DataFrame | None = None,
        harmonisation_year: int | None = None,
        verbose: bool = True,
        run_checks: bool = True,
        progress: bool = True,
        n_processes: int | None = multiprocessing.cpu_count(),
    ) -> AR6SCMRunner:
        """
        Initialise from the config used in AR6

        Parameters
        ----------
        magicc_exe_path
            Path to the MAGICC executable to use.

            This should be a MAGICC v7.5.3 executable.

        magicc_prob_distribution_path
            Path to the MAGICC probabilistic distribution.

            This should be the AR6 probabilistic distribution.

        output_variables
            Variables to include in the output

        batch_size_scenarios
            The number of scenarios to run at a time

        db
            Database to use for storing results.

            If not supplied, raw outputs are not stored.

        historical_emissions
            Historical emissions used for harmonisation

            Only required if `run_checks` is `True` to check
            that the data is harmonised before running the SCMs.

        harmonisation_year
            Year in which the data was harmonised

            Only required if `run_checks` is `True` to check
            that the data is harmonised before running the SCMs.

        verbose
            Should verbose messages be printed?

            This is a temporary hack while we think about how to handle logging

        run_checks
            Should checks of the input and output data be performed?

            If this is turned off, things are faster,
            but error messages are much less clear if things go wrong.

        progress
            Should progress bars be shown for each operation?

        n_processes
            Number of processes to use for parallel processing.

            Set to `None` to process in serial.

        Returns
        -------
        :
            Initialised SCM runner
        """
        os.environ["MAGICC_EXECUTABLE_7"] = str(magicc_exe_path)
        check_ar6_magicc7_version()

        magicc_ar6_prob_cfg = load_ar6_magicc_probabilistic_config(
            magicc_prob_distribution_path
        )

        startyear = 1750
        common_cfg = {
            "startyear": startyear,
            "out_dynamic_vars": convert_openscm_runner_output_names_to_magicc_output_names(  # noqa: E501
                output_variables
            ),
            "out_ascii_binary": "BINARY",
            "out_binary_format": 2,
        }

        run_config = [{**common_cfg, **base_cfg} for base_cfg in magicc_ar6_prob_cfg]
        magicc_full_distribution_n_config = 600
        if len(run_config) != magicc_full_distribution_n_config:
            raise AssertionError(len(run_config))

        return cls(
            climate_models_cfgs={"MAGICC7": run_config},
            output_variables=output_variables,
            batch_size_scenarios=batch_size_scenarios,
            db=db,
            historical_emissions=historical_emissions,
            harmonisation_year=harmonisation_year,
            verbose=verbose,
            run_checks=run_checks,
            n_processes=n_processes,
            force_interpolate_to_yearly=True,  # MAGICC safer with annual input
            res_column_type=int,  # annual output by default
        )

batch_size_scenarios class-attribute instance-attribute #

batch_size_scenarios: int | None = None

The number of scenarios to run at a time

Smaller batch sizes use less memory, but take longer overall (all else being equal).

If not supplied, all scenarios are run simultaneously.

climate_models_cfgs class-attribute instance-attribute #

climate_models_cfgs: dict[str, list[dict[str, Any]]] = (
    field(
        repr=lambda x: join(
            f"{climate_model}: {len(cfgs)} configurations"
            for (climate_model, cfgs) in items()
        )
    )
)

Climate models to run and the configuration to use with them

db class-attribute instance-attribute #

db: OpenSCMDB | None = None

Database in which to store the output of the runs

If not supplied, output of the runs is not stored.

force_interpolate_to_yearly class-attribute instance-attribute #

force_interpolate_to_yearly: bool = True

Should we interpolate scenarios we run to yearly steps before running the SCMs.

harmonisation_year class-attribute instance-attribute #

harmonisation_year: int | None = None

Year in which the data was harmonised

Only required if run_checks is True to check that the data to run is harmonised.

historical_emissions class-attribute instance-attribute #

historical_emissions: DataFrame | None = None

Historical emissions used for harmonisation

Only required if run_checks is True to check that the data to run is harmonised.

n_processes class-attribute instance-attribute #

n_processes: int | None = cpu_count()

Number of processes to use for parallel processing.

Set to None to process in serial.

output_variables instance-attribute #

output_variables: tuple[str, ...]

Variables to include in the output

progress class-attribute instance-attribute #

progress: bool = True

Should progress bars be shown for each operation?

res_column_type class-attribute instance-attribute #

res_column_type: type = int

Type to cast the result's column type to

run_checks class-attribute instance-attribute #

run_checks: bool = True

If True, run checks on both input and output data

If you are sure about your workflow, you can disable the checks to speed things up (but we don't recommend this unless you really are confident about what you're doing).

verbose class-attribute instance-attribute #

verbose: bool = True

Should verbose messages be printed?

This is a temporary hack while we think about how to handle logging

__call__ #

__call__(
    in_emissions: DataFrame, force_rerun: bool = False
) -> DataFrame

Run the simple climate model

Parameters:

Name Type Description Default
in_emissions DataFrame

Emissions to run

required
force_rerun bool

Force scenarios to re-run (i.e. disable caching).

False

Returns:

Type Description
DataFrame

Raw results from the simple climate model

Source code in src/gcages/ar6/scm_running.py
def __call__(
    self, in_emissions: pd.DataFrame, force_rerun: bool = False
) -> pd.DataFrame:
    """
    Run the simple climate model

    Parameters
    ----------
    in_emissions
        Emissions to run

    force_rerun
        Force scenarios to re-run (i.e. disable caching).

    Returns
    -------
    :
        Raw results from the simple climate model
    """
    if self.run_checks:
        assert_index_is_multiindex(in_emissions)
        assert_has_index_levels(
            in_emissions, ["variable", "unit", "model", "scenario"]
        )
        assert_has_no_pint_incompatible_characters(
            in_emissions.index.get_level_values("unit").unique()
        )
        assert_data_is_all_numeric(in_emissions)

        if self.historical_emissions is None:
            msg = "`self.historical_emissions` must be set to check the infilling"
            raise AssertionError(msg)

        if self.harmonisation_year is None:
            msg = "`self.harmonisation_year` must be set to check the infilling"
            raise AssertionError(msg)

        assert_has_data_for_times(
            in_emissions,
            name="in_emissions",
            times=[self.harmonisation_year, 2100],
            allow_nan=False,
        )

        assert_harmonised(
            in_emissions,
            history=self.historical_emissions,
            harmonisation_time=self.harmonisation_year,
            rounding=5,  # level of data storage in historical data often
        )
        assert_all_groups_are_complete(
            # The combo of the input and infilled should be complete
            in_emissions,
            complete_index=self.historical_emissions.index.droplevel("unit"),
        )

    openscm_runner_emissions = update_index_levels_func(
        in_emissions,
        {
            "variable": partial(
                convert_variable_name,
                from_convention=SupportedNamingConventions.GCAGES,
                to_convention=SupportedNamingConventions.OPENSCM_RUNNER,
            )
        },
    )
    if self.force_interpolate_to_yearly:
        # TODO: put interpolate to annual steps in pandas-openscm
        # Interpolate to ensure no nans.
        for y in range(
            openscm_runner_emissions.columns.min(),
            openscm_runner_emissions.columns.max() + 1,
        ):
            if y not in openscm_runner_emissions:
                openscm_runner_emissions[y] = np.nan

        openscm_runner_emissions = (
            openscm_runner_emissions.sort_index(axis="columns")
            .T.interpolate("index")
            .T
        )

    scm_results_maybe = run_scms(
        openscm_runner_emissions,
        climate_models_cfgs=self.climate_models_cfgs,
        output_variables=self.output_variables,
        scenario_group_levels=["model", "scenario"],
        n_processes=self.n_processes if self.n_processes is not None else 1,
        db=self.db,
        verbose=self.verbose,
        batch_size_scenarios=self.batch_size_scenarios,
        force_rerun=force_rerun,
    )

    if self.db is not None:
        # Results aren't kept in memory during running, so have to load them now.
        # User can use `run_scms` directly if they want to process differently.
        out_maybe = self.db.load()
        if out_maybe is None:
            raise TypeError(out_maybe)

        out: pd.DataFrame = out_maybe

    else:
        if scm_results_maybe is None:
            raise TypeError(scm_results_maybe)

        out = scm_results_maybe

    out.columns = out.columns.astype(self.res_column_type)

    if self.run_checks:
        # All scenarios have output
        pd.testing.assert_index_equal(
            out.index.droplevel(
                out.index.names.difference(["model", "scenario"])  # type: ignore # pandas-stubs out of date
            ).drop_duplicates(),
            in_emissions.index.droplevel(
                in_emissions.index.names.difference(["model", "scenario"])  # type: ignore # pandas-stubs out of date
            ).drop_duplicates(),
            check_order=False,
        )
        # Expected output is provided
        assert_all_groups_are_complete(
            out,
            complete_index=pd.MultiIndex.from_arrays(
                [list(self.output_variables)], names=["variable"]
            ),
        )

    return out

from_ar6_config classmethod #

from_ar6_config(
    magicc_exe_path: Path,
    magicc_prob_distribution_path: Path,
    output_variables: tuple[
        str, ...
    ] = DEFAULT_OUTPUT_VARIABLES,
    batch_size_scenarios: int | None = None,
    db: OpenSCMDB | None = None,
    historical_emissions: DataFrame | None = None,
    harmonisation_year: int | None = None,
    verbose: bool = True,
    run_checks: bool = True,
    progress: bool = True,
    n_processes: int | None = cpu_count(),
) -> AR6SCMRunner

Initialise from the config used in AR6

Parameters:

Name Type Description Default
magicc_exe_path Path

Path to the MAGICC executable to use.

This should be a MAGICC v7.5.3 executable.

required
magicc_prob_distribution_path Path

Path to the MAGICC probabilistic distribution.

This should be the AR6 probabilistic distribution.

required
output_variables tuple[str, ...]

Variables to include in the output

DEFAULT_OUTPUT_VARIABLES
batch_size_scenarios int | None

The number of scenarios to run at a time

None
db OpenSCMDB | None

Database to use for storing results.

If not supplied, raw outputs are not stored.

None
historical_emissions DataFrame | None

Historical emissions used for harmonisation

Only required if run_checks is True to check that the data is harmonised before running the SCMs.

None
harmonisation_year int | None

Year in which the data was harmonised

Only required if run_checks is True to check that the data is harmonised before running the SCMs.

None
verbose bool

Should verbose messages be printed?

This is a temporary hack while we think about how to handle logging

True
run_checks bool

Should checks of the input and output data be performed?

If this is turned off, things are faster, but error messages are much less clear if things go wrong.

True
progress bool

Should progress bars be shown for each operation?

True
n_processes int | None

Number of processes to use for parallel processing.

Set to None to process in serial.

cpu_count()

Returns:

Type Description
AR6SCMRunner

Initialised SCM runner

Source code in src/gcages/ar6/scm_running.py
@classmethod
def from_ar6_config(  # noqa: PLR0913
    cls,
    magicc_exe_path: Path,
    magicc_prob_distribution_path: Path,
    output_variables: tuple[str, ...] = DEFAULT_OUTPUT_VARIABLES,
    batch_size_scenarios: int | None = None,
    db: OpenSCMDB | None = None,
    historical_emissions: pd.DataFrame | None = None,
    harmonisation_year: int | None = None,
    verbose: bool = True,
    run_checks: bool = True,
    progress: bool = True,
    n_processes: int | None = multiprocessing.cpu_count(),
) -> AR6SCMRunner:
    """
    Initialise from the config used in AR6

    Parameters
    ----------
    magicc_exe_path
        Path to the MAGICC executable to use.

        This should be a MAGICC v7.5.3 executable.

    magicc_prob_distribution_path
        Path to the MAGICC probabilistic distribution.

        This should be the AR6 probabilistic distribution.

    output_variables
        Variables to include in the output

    batch_size_scenarios
        The number of scenarios to run at a time

    db
        Database to use for storing results.

        If not supplied, raw outputs are not stored.

    historical_emissions
        Historical emissions used for harmonisation

        Only required if `run_checks` is `True` to check
        that the data is harmonised before running the SCMs.

    harmonisation_year
        Year in which the data was harmonised

        Only required if `run_checks` is `True` to check
        that the data is harmonised before running the SCMs.

    verbose
        Should verbose messages be printed?

        This is a temporary hack while we think about how to handle logging

    run_checks
        Should checks of the input and output data be performed?

        If this is turned off, things are faster,
        but error messages are much less clear if things go wrong.

    progress
        Should progress bars be shown for each operation?

    n_processes
        Number of processes to use for parallel processing.

        Set to `None` to process in serial.

    Returns
    -------
    :
        Initialised SCM runner
    """
    os.environ["MAGICC_EXECUTABLE_7"] = str(magicc_exe_path)
    check_ar6_magicc7_version()

    magicc_ar6_prob_cfg = load_ar6_magicc_probabilistic_config(
        magicc_prob_distribution_path
    )

    startyear = 1750
    common_cfg = {
        "startyear": startyear,
        "out_dynamic_vars": convert_openscm_runner_output_names_to_magicc_output_names(  # noqa: E501
            output_variables
        ),
        "out_ascii_binary": "BINARY",
        "out_binary_format": 2,
    }

    run_config = [{**common_cfg, **base_cfg} for base_cfg in magicc_ar6_prob_cfg]
    magicc_full_distribution_n_config = 600
    if len(run_config) != magicc_full_distribution_n_config:
        raise AssertionError(len(run_config))

    return cls(
        climate_models_cfgs={"MAGICC7": run_config},
        output_variables=output_variables,
        batch_size_scenarios=batch_size_scenarios,
        db=db,
        historical_emissions=historical_emissions,
        harmonisation_year=harmonisation_year,
        verbose=verbose,
        run_checks=run_checks,
        n_processes=n_processes,
        force_interpolate_to_yearly=True,  # MAGICC safer with annual input
        res_column_type=int,  # annual output by default
    )

get_ar6_full_historical_emissions cached #

get_ar6_full_historical_emissions(
    filepath: Path,
) -> DataFrame

Get the full AR6 historical emissions

Parameters:

Name Type Description Default
filepath Path

Filepath from which to load the emissions

required

Returns:

Type Description
DataFrame

Historical emissions as used in AR6

Raises:

Type Description
AssertionError

filepath points to a file that does not have the expected hash

Source code in src/gcages/ar6/infilling.py
@functools.cache
def get_ar6_full_historical_emissions(filepath: Path) -> pd.DataFrame:
    """
    Get the full AR6 historical emissions

    Parameters
    ----------
    filepath
        Filepath from which to load the emissions

    Returns
    -------
    :
        Historical emissions as used in AR6

    Raises
    ------
    AssertionError
        `filepath` points to a file that does not have the expected hash
    """
    try:
        from pandas_indexing.core import assignlevel, projectlevel
        from pandas_indexing.selectors import isin, ismatch
    except ImportError as exc:
        raise MissingOptionalDependencyError(
            "get_ar6_full_historical_emissions", requirement="pandas_indexing"
        ) from exc

    fp_hash = get_file_hash(filepath, algorithm="sha256")
    if platform.system() == "Windows":
        fp_hash_exp = "39ca03de170dab52a1845a25ebedeeb69704b8c86d16be14a08cc06fceba5369"
    else:
        fp_hash_exp = "9ad0550c671701622ec2b2e7ba2b6c38d58f83507938ab0f5aa8b1a35d26c015"

    if fp_hash != fp_hash_exp:
        msg = (
            f"The sha256 hash of {filepath} is {fp_hash}. "
            f"This does not match what we expect ({fp_hash_exp=})."
        )
        raise AssertionError(msg)

    raw = load_timeseries_csv(
        filepath,
        lower_column_names=True,
        index_columns=["model", "scenario", "variable", "region", "unit"],
        out_columns_type=int,
    )

    history = raw.loc[
        isin(scenario="ssp245") & ~ismatch(variable="**CO2"), :2015
    ].reset_index(["model", "scenario"], drop=True)

    history = assignlevel(
        history,
        variable=projectlevel(history.index, "variable").map(
            lambda x: convert_variable_name(
                x,
                from_convention=SupportedNamingConventions.AR6_CFC_INFILLING_DB,
                to_convention=SupportedNamingConventions.GCAGES,
            )
        ),
        # Not strictly necessary, but makes life easier
        unit=projectlevel(history.index, "unit").map(
            lambda x: x.replace("HFC4310mee/yr", "HFC4310/yr").replace(
                "NO2 / yr", "NO2/yr"
            )
        ),
    )
    # Not sure why this happened, but here we are
    history.loc[ismatch(variable="**HFC245fa"), :] *= 0.0  # type: ignore # pix typing not playing nice with pandas-stubs

    return history