Skip to content

Hooks

kedro_aim.framework.hooks.aim_hook

kedro_aim.framework.hooks.aim_hook.AimHook

The hook that is used to integrate Aim with Kedro.

The hook is responsible for:

  • Creating the Aim run before the pipeline is run.
  • Adding the Aim run to the catlog.
Source code in kedro_aim/framework/hooks/aim_hook.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
class AimHook:
    """The hook that is used to integrate Aim with Kedro.

    The hook is responsible for:

    - Creating the Aim run before the pipeline is run.
    - Adding the Aim run to the catlog.
    """

    run: Optional[Run] = None
    aim_confg: KedroAimConfig

    @hook_impl
    def after_context_created(
        self,
        context: KedroContext,
    ) -> None:
        """Hooks to be invoked after a `KedroContext` is created.

        This hook reads the Aim configuration from the `aim.yml` from the `conf` folder
        of the Kedro project and stores it in the `aim_config` attribute of the hook.

        Args:
            context: The newly created context.
        """
        # Find the AimConfig in the context
        try:
            conf_aim_yml = context.config_loader.get("aim*", "aim*/**")
        except MissingConfigException:
            LOGGER.warning("No 'aim.yml' config file found in environment")
            conf_aim_yml = {}
        aim_config = KedroAimConfig.parse_obj(conf_aim_yml)

        # store in context for interactive use
        context.__setattr__("aim", aim_config)

        # store for further reuse
        self.aim_config = aim_config

    @hook_impl
    def after_catalog_created(
        self,
        catalog: DataCatalog,
        conf_catalog: Dict[str, Any],
        conf_creds: Dict[str, Any],
        feed_dict: Dict[str, Any],
        save_version: str,
        load_versions: str,
    ) -> None:
        """Hooks to be invoked after a data catalog is created.

        Im this hook we go through all the datasets in the catalog an replace the
        datasets that are of type `AimArtifactDataSet` with a special aim dataset.

        Args:
            catalog: The catalog that was created.
            conf_catalog: The config from which the catalog was created.
            conf_creds: The credentials conf from which the catalog was created.
            feed_dict: The feed_dict that was added to the catalog after creation.
            save_version: The save_version used in `save` operations
                for all datasets in the catalog.
            load_versions: The load_versions used in `load` operations
                for each dataset in the catalog.
        """
        # HACK: Replace all AimArtifactDataSet with a AimArtifactDataSetChild dataset.
        # This is needed to pass a reference of the run to the dataset.
        for name, dataset in catalog._data_sets.items():
            if isinstance(dataset, AimArtifactDataSet):
                catalog._data_sets[name] = make_run_dataset(self, dataset)

    @hook_impl
    def before_pipeline_run(
        self, run_params: Dict[str, Any], pipeline: Pipeline, catalog: DataCatalog
    ) -> None:
        """Hook to be invoked before a pipeline runs.

        Before the pipeline runs, we create the Aim run and add it to the catalog
        under the name `run`. This allows us to access the run in the pipeline.

        Args:
            run_params: The params used to run the pipeline.
                Should have the following schema

                ```json
                   {
                     "session_id": str,
                     "project_path": str,
                     "env": str,
                     "kedro_version": str,
                     "tags": Optional[List[str]],
                     "from_nodes": Optional[List[str]],
                     "to_nodes": Optional[List[str]],
                     "node_names": Optional[List[str]],
                     "from_inputs": Optional[List[str]],
                     "to_outputs": Optional[List[str]],
                     "load_versions": Optional[List[str]],
                     "pipeline_name": str,
                     "extra_params": Optional[Dict[str, Any]]
                   }
                ```

            pipeline: The `Pipeline` that will be run.
            catalog: The `DataCatalog` to be used during the run.
        """
        if check_aim_enabled(run_params["pipeline_name"], self.aim_config):
            # Create the Aim Run
            self.run = Run(
                run_hash=self.aim_config.run.run_hash,
                repo=load_repository(self.aim_config.repository),
                experiment=self.aim_config.run.experiment,
                system_tracking_interval=self.aim_config.run.system_tracking_interval,
                log_system_params=self.aim_config.run.log_system_params,
                capture_terminal_logs=self.aim_config.run.capture_terminal_logs,
            )

            # log run paramerters
            self.run["kedro"] = run_params

            # add tags
            for tag in self.aim_config.run.tags:
                self.run.add_tag(tag)

            # save run in catalog
            assert not catalog.exists("run"), "catalog already contains a 'run' dataset"
            catalog.add("run", MemoryDataSet(copy_mode="assign"))
            catalog.save("run", self.run)

    @hook_impl
    def before_node_run(
        self,
        node: Node,
        catalog: DataCatalog,
        inputs: Dict[str, Any],
        is_async: bool,
        session_id: str,
    ) -> None:
        """Hook to be invoked before a node runs.

        All `parameters` that are passed to the node are logged to the run.

        Args:
            node: The `Node` to run.
            catalog: A `DataCatalog` containing the node's inputs and outputs.
            inputs: The dictionary of inputs dataset.
                The keys are dataset names and the values are the actual loaded input
                data, not the dataset instance.
            is_async: Whether the node was run in `async` mode.
            session_id: The id of the session.
        """
        if self.run is not None:
            # only parameters will be logged.
            for k, v in inputs.items():
                if k.startswith("params:"):
                    self.run[k[7:]] = v
                elif k == "parameters":
                    self.run[k] = v

    @hook_impl
    def after_pipeline_run(
        self,
        run_params: Dict[str, Any],
        pipeline: Pipeline,
        catalog: DataCatalog,
    ) -> None:
        """Hook to be invoked after a pipeline runs.

        After the pipeline runs, we close the Aim run and add `StatusTag.SUCCESS` tag.

        Args:
            run_params: The params used to run the pipeline.
                Should have the following schema

                ```json
                   {
                     "session_id": str,
                     "project_path": str,
                     "env": str,
                     "kedro_version": str,
                     "tags": Optional[List[str]],
                     "from_nodes": Optional[List[str]],
                     "to_nodes": Optional[List[str]],
                     "node_names": Optional[List[str]],
                     "from_inputs": Optional[List[str]],
                     "to_outputs": Optional[List[str]],
                     "load_versions": Optional[List[str]],
                     "pipeline_name": str,
                     "extra_params": Optional[Dict[str, Any]]
                   }
                ```

            pipeline: The `Pipeline` that was run.
            catalog: The `DataCatalog` used during the run.
        """
        if self.run is not None:
            self.run.add_tag(StatusTag.SUCCESS)
            self.run.finalize()
            self.run.close()

    @hook_impl
    def on_pipeline_error(
        self,
        error: Exception,
        run_params: Dict[str, Any],
        pipeline: Pipeline,
        catalog: DataCatalog,
    ) -> None:
        """Hook to be invoked if a pipeline run throws an uncaught Exception.

        In this case, we close the run and add a `StatusTag.FAILURE` tag.

        Args:
            error: The uncaught exception thrown during the pipeline run.
            run_params: The params used to run the pipeline.
                Should have the following schema

                ```json
                   {
                     "session_id": str,
                     "project_path": str,
                     "env": str,
                     "kedro_version": str,
                     "tags": Optional[List[str]],
                     "from_nodes": Optional[List[str]],
                     "to_nodes": Optional[List[str]],
                     "node_names": Optional[List[str]],
                     "from_inputs": Optional[List[str]],
                     "to_outputs": Optional[List[str]],
                     "load_versions": Optional[List[str]],
                     "pipeline_name": str,
                     "extra_params": Optional[Dict[str, Any]]
                   }
                ```
            pipeline: The ``Pipeline`` that will was run.
            catalog: The ``DataCatalog`` used during the run.
        """
        if self.run is not None:
            self.run.add_tag(StatusTag.FAILURE)
            self.run.finalize()
            self.run.close()

kedro_aim.framework.hooks.aim_hook.AimHook.after_catalog_created(catalog, conf_catalog, conf_creds, feed_dict, save_version, load_versions)

Hooks to be invoked after a data catalog is created.

Im this hook we go through all the datasets in the catalog an replace the datasets that are of type AimArtifactDataSet with a special aim dataset.

Parameters:

Name Type Description Default
catalog DataCatalog

The catalog that was created.

required
conf_catalog Dict[str, Any]

The config from which the catalog was created.

required
conf_creds Dict[str, Any]

The credentials conf from which the catalog was created.

required
feed_dict Dict[str, Any]

The feed_dict that was added to the catalog after creation.

required
save_version str

The save_version used in save operations for all datasets in the catalog.

required
load_versions str

The load_versions used in load operations for each dataset in the catalog.

required
Source code in kedro_aim/framework/hooks/aim_hook.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@hook_impl
def after_catalog_created(
    self,
    catalog: DataCatalog,
    conf_catalog: Dict[str, Any],
    conf_creds: Dict[str, Any],
    feed_dict: Dict[str, Any],
    save_version: str,
    load_versions: str,
) -> None:
    """Hooks to be invoked after a data catalog is created.

    Im this hook we go through all the datasets in the catalog an replace the
    datasets that are of type `AimArtifactDataSet` with a special aim dataset.

    Args:
        catalog: The catalog that was created.
        conf_catalog: The config from which the catalog was created.
        conf_creds: The credentials conf from which the catalog was created.
        feed_dict: The feed_dict that was added to the catalog after creation.
        save_version: The save_version used in `save` operations
            for all datasets in the catalog.
        load_versions: The load_versions used in `load` operations
            for each dataset in the catalog.
    """
    # HACK: Replace all AimArtifactDataSet with a AimArtifactDataSetChild dataset.
    # This is needed to pass a reference of the run to the dataset.
    for name, dataset in catalog._data_sets.items():
        if isinstance(dataset, AimArtifactDataSet):
            catalog._data_sets[name] = make_run_dataset(self, dataset)

kedro_aim.framework.hooks.aim_hook.AimHook.after_context_created(context)

Hooks to be invoked after a KedroContext is created.

This hook reads the Aim configuration from the aim.yml from the conf folder of the Kedro project and stores it in the aim_config attribute of the hook.

Parameters:

Name Type Description Default
context KedroContext

The newly created context.

required
Source code in kedro_aim/framework/hooks/aim_hook.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@hook_impl
def after_context_created(
    self,
    context: KedroContext,
) -> None:
    """Hooks to be invoked after a `KedroContext` is created.

    This hook reads the Aim configuration from the `aim.yml` from the `conf` folder
    of the Kedro project and stores it in the `aim_config` attribute of the hook.

    Args:
        context: The newly created context.
    """
    # Find the AimConfig in the context
    try:
        conf_aim_yml = context.config_loader.get("aim*", "aim*/**")
    except MissingConfigException:
        LOGGER.warning("No 'aim.yml' config file found in environment")
        conf_aim_yml = {}
    aim_config = KedroAimConfig.parse_obj(conf_aim_yml)

    # store in context for interactive use
    context.__setattr__("aim", aim_config)

    # store for further reuse
    self.aim_config = aim_config

kedro_aim.framework.hooks.aim_hook.AimHook.after_pipeline_run(run_params, pipeline, catalog)

Hook to be invoked after a pipeline runs.

After the pipeline runs, we close the Aim run and add StatusTag.SUCCESS tag.

Parameters:

Name Type Description Default
run_params Dict[str, Any]

The params used to run the pipeline. Should have the following schema

   {
     "session_id": str,
     "project_path": str,
     "env": str,
     "kedro_version": str,
     "tags": Optional[List[str]],
     "from_nodes": Optional[List[str]],
     "to_nodes": Optional[List[str]],
     "node_names": Optional[List[str]],
     "from_inputs": Optional[List[str]],
     "to_outputs": Optional[List[str]],
     "load_versions": Optional[List[str]],
     "pipeline_name": str,
     "extra_params": Optional[Dict[str, Any]]
   }
required
pipeline Pipeline

The Pipeline that was run.

required
catalog DataCatalog

The DataCatalog used during the run.

required
Source code in kedro_aim/framework/hooks/aim_hook.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
@hook_impl
def after_pipeline_run(
    self,
    run_params: Dict[str, Any],
    pipeline: Pipeline,
    catalog: DataCatalog,
) -> None:
    """Hook to be invoked after a pipeline runs.

    After the pipeline runs, we close the Aim run and add `StatusTag.SUCCESS` tag.

    Args:
        run_params: The params used to run the pipeline.
            Should have the following schema

            ```json
               {
                 "session_id": str,
                 "project_path": str,
                 "env": str,
                 "kedro_version": str,
                 "tags": Optional[List[str]],
                 "from_nodes": Optional[List[str]],
                 "to_nodes": Optional[List[str]],
                 "node_names": Optional[List[str]],
                 "from_inputs": Optional[List[str]],
                 "to_outputs": Optional[List[str]],
                 "load_versions": Optional[List[str]],
                 "pipeline_name": str,
                 "extra_params": Optional[Dict[str, Any]]
               }
            ```

        pipeline: The `Pipeline` that was run.
        catalog: The `DataCatalog` used during the run.
    """
    if self.run is not None:
        self.run.add_tag(StatusTag.SUCCESS)
        self.run.finalize()
        self.run.close()

kedro_aim.framework.hooks.aim_hook.AimHook.before_node_run(node, catalog, inputs, is_async, session_id)

Hook to be invoked before a node runs.

All parameters that are passed to the node are logged to the run.

Parameters:

Name Type Description Default
node Node

The Node to run.

required
catalog DataCatalog

A DataCatalog containing the node's inputs and outputs.

required
inputs Dict[str, Any]

The dictionary of inputs dataset. The keys are dataset names and the values are the actual loaded input data, not the dataset instance.

required
is_async bool

Whether the node was run in async mode.

required
session_id str

The id of the session.

required
Source code in kedro_aim/framework/hooks/aim_hook.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@hook_impl
def before_node_run(
    self,
    node: Node,
    catalog: DataCatalog,
    inputs: Dict[str, Any],
    is_async: bool,
    session_id: str,
) -> None:
    """Hook to be invoked before a node runs.

    All `parameters` that are passed to the node are logged to the run.

    Args:
        node: The `Node` to run.
        catalog: A `DataCatalog` containing the node's inputs and outputs.
        inputs: The dictionary of inputs dataset.
            The keys are dataset names and the values are the actual loaded input
            data, not the dataset instance.
        is_async: Whether the node was run in `async` mode.
        session_id: The id of the session.
    """
    if self.run is not None:
        # only parameters will be logged.
        for k, v in inputs.items():
            if k.startswith("params:"):
                self.run[k[7:]] = v
            elif k == "parameters":
                self.run[k] = v

kedro_aim.framework.hooks.aim_hook.AimHook.before_pipeline_run(run_params, pipeline, catalog)

Hook to be invoked before a pipeline runs.

Before the pipeline runs, we create the Aim run and add it to the catalog under the name run. This allows us to access the run in the pipeline.

Parameters:

Name Type Description Default
run_params Dict[str, Any]

The params used to run the pipeline. Should have the following schema

   {
     "session_id": str,
     "project_path": str,
     "env": str,
     "kedro_version": str,
     "tags": Optional[List[str]],
     "from_nodes": Optional[List[str]],
     "to_nodes": Optional[List[str]],
     "node_names": Optional[List[str]],
     "from_inputs": Optional[List[str]],
     "to_outputs": Optional[List[str]],
     "load_versions": Optional[List[str]],
     "pipeline_name": str,
     "extra_params": Optional[Dict[str, Any]]
   }
required
pipeline Pipeline

The Pipeline that will be run.

required
catalog DataCatalog

The DataCatalog to be used during the run.

required
Source code in kedro_aim/framework/hooks/aim_hook.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
@hook_impl
def before_pipeline_run(
    self, run_params: Dict[str, Any], pipeline: Pipeline, catalog: DataCatalog
) -> None:
    """Hook to be invoked before a pipeline runs.

    Before the pipeline runs, we create the Aim run and add it to the catalog
    under the name `run`. This allows us to access the run in the pipeline.

    Args:
        run_params: The params used to run the pipeline.
            Should have the following schema

            ```json
               {
                 "session_id": str,
                 "project_path": str,
                 "env": str,
                 "kedro_version": str,
                 "tags": Optional[List[str]],
                 "from_nodes": Optional[List[str]],
                 "to_nodes": Optional[List[str]],
                 "node_names": Optional[List[str]],
                 "from_inputs": Optional[List[str]],
                 "to_outputs": Optional[List[str]],
                 "load_versions": Optional[List[str]],
                 "pipeline_name": str,
                 "extra_params": Optional[Dict[str, Any]]
               }
            ```

        pipeline: The `Pipeline` that will be run.
        catalog: The `DataCatalog` to be used during the run.
    """
    if check_aim_enabled(run_params["pipeline_name"], self.aim_config):
        # Create the Aim Run
        self.run = Run(
            run_hash=self.aim_config.run.run_hash,
            repo=load_repository(self.aim_config.repository),
            experiment=self.aim_config.run.experiment,
            system_tracking_interval=self.aim_config.run.system_tracking_interval,
            log_system_params=self.aim_config.run.log_system_params,
            capture_terminal_logs=self.aim_config.run.capture_terminal_logs,
        )

        # log run paramerters
        self.run["kedro"] = run_params

        # add tags
        for tag in self.aim_config.run.tags:
            self.run.add_tag(tag)

        # save run in catalog
        assert not catalog.exists("run"), "catalog already contains a 'run' dataset"
        catalog.add("run", MemoryDataSet(copy_mode="assign"))
        catalog.save("run", self.run)

kedro_aim.framework.hooks.aim_hook.AimHook.on_pipeline_error(error, run_params, pipeline, catalog)

Hook to be invoked if a pipeline run throws an uncaught Exception.

In this case, we close the run and add a StatusTag.FAILURE tag.

Parameters:

Name Type Description Default
error Exception

The uncaught exception thrown during the pipeline run.

required
run_params Dict[str, Any]

The params used to run the pipeline. Should have the following schema

   {
     "session_id": str,
     "project_path": str,
     "env": str,
     "kedro_version": str,
     "tags": Optional[List[str]],
     "from_nodes": Optional[List[str]],
     "to_nodes": Optional[List[str]],
     "node_names": Optional[List[str]],
     "from_inputs": Optional[List[str]],
     "to_outputs": Optional[List[str]],
     "load_versions": Optional[List[str]],
     "pipeline_name": str,
     "extra_params": Optional[Dict[str, Any]]
   }
required
pipeline Pipeline

The Pipeline that will was run.

required
catalog DataCatalog

The DataCatalog used during the run.

required
Source code in kedro_aim/framework/hooks/aim_hook.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
@hook_impl
def on_pipeline_error(
    self,
    error: Exception,
    run_params: Dict[str, Any],
    pipeline: Pipeline,
    catalog: DataCatalog,
) -> None:
    """Hook to be invoked if a pipeline run throws an uncaught Exception.

    In this case, we close the run and add a `StatusTag.FAILURE` tag.

    Args:
        error: The uncaught exception thrown during the pipeline run.
        run_params: The params used to run the pipeline.
            Should have the following schema

            ```json
               {
                 "session_id": str,
                 "project_path": str,
                 "env": str,
                 "kedro_version": str,
                 "tags": Optional[List[str]],
                 "from_nodes": Optional[List[str]],
                 "to_nodes": Optional[List[str]],
                 "node_names": Optional[List[str]],
                 "from_inputs": Optional[List[str]],
                 "to_outputs": Optional[List[str]],
                 "load_versions": Optional[List[str]],
                 "pipeline_name": str,
                 "extra_params": Optional[Dict[str, Any]]
               }
            ```
        pipeline: The ``Pipeline`` that will was run.
        catalog: The ``DataCatalog`` used during the run.
    """
    if self.run is not None:
        self.run.add_tag(StatusTag.FAILURE)
        self.run.finalize()
        self.run.close()

kedro_aim.framework.hooks.aim_hook.StatusTag

Bases: str, Enum

Enumeration of the status tags that can be used to tag metrics.

Source code in kedro_aim/framework/hooks/aim_hook.py
21
22
23
24
25
class StatusTag(str, Enum):
    """Enumeration of the status tags that can be used to tag metrics."""

    FAILURE = "failure"
    SUCCESS = "success"

kedro_aim.framework.hooks.utils

kedro_aim.framework.hooks.utils.check_aim_enabled(pipeline_name, aim_config)

Check if Aim is enabled for the given pipeline.

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline.

required
aim_config KedroAimConfig

Kedro-Aim configuration.

required

Returns:

Type Description
bool

A boolean indicating whether Aim is enabled for the given pipeline.

Source code in kedro_aim/framework/hooks/utils.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def check_aim_enabled(pipeline_name: str, aim_config: KedroAimConfig) -> bool:
    """Check if Aim is enabled for the given pipeline.

    Args:
        pipeline_name: Name of the pipeline.
        aim_config: Kedro-Aim configuration.

    Returns:
        A boolean indicating whether Aim is enabled for the given pipeline.
    """
    return pipeline_name not in aim_config.disable.pipelines