Corridor-Runtime
The artifact generated from the platform is independent of the platform and can run in an isolated runtime environment or production environment. The information stored in the artifact is useful in many cases, where we might want to:
- check metadata for the objects
- check the input tables and columns used
- see the lineage and relationship between the objects
- run the entire artifact or some components of the artifact to get complete or intermediate results
corridor-runtime is a utility package created by Corridor, which can help us in doing the above mentioned tasks
in a very easy manner, without having to worry about extracting the artifact bundle (bundle.tar.gz) or the files
inside the bundle.
Note
The corridor-runtime package works on specific artifact format (mentioned in the metadata.json inside the artifact bundle).
A given version of corridor-runtime may not be compatible with older/newer artifacts as it assumes artifact to be structured
in some form.
Available options
>>> help(Artifact)
Help on class Artifact in module corridor_runtime.artifact:
class Artifact(builtins.object)
| Artifact(artifact_path)
|
| Methods defined here:
|
| __del__(self)
|
| __init__(self, artifact_path)
| Initialize self. See help(type(self)) for accurate signature.
|
| code(self, lang_spec='pyspark-dataframe', exclude=(), include=())
|
| run(self, data, runtime_params=(), lang_spec='pyspark-dataframe', exclude=(), include=())
| :param exclude: The list of nodes to exclude from the unified artifact.
| Each node is denoted by a unique `ref`
| :param include: The list of nodes to include in the artifact.
| The artifact will be create with only the nodes in `include`
| and wrapped inside a `main` function.
|
| ----------------------------------------------------------------------
| Readonly properties defined here:
|
| execution_graph
| :returns: a networkx `dag` with all the nodes from PEExecService for now.
| Each node is a networkx graph node with the attributes:
| `object`, one of:
| - Feature
| attributes: ['id', 'name', 'alias', 'type', 'platform_entity', 'category', 'group', 'version']
| - Model
| attributes: ['id', 'name', 'output_alias', 'type', 'platform_entity', 'group', 'version']
| `is_model`, True if the node is a Model object, False otherwise
|
| inputs
|
| versions
|
| ----------------------------------------------------------------------
| Data descriptors defined here:
|
| __dict__
| dictionary for instance variables (if defined)
|
| __weakref__
| list of weak references to the object (if defined)
Example
The example focuses on a Feature with below details:
- name:
Debt Capacity - alias:
debt_capacity - inputs:
application_loan_amount,annual_income - definition:
return application_loan_amount/(max(0.1, annual_income)) -
artifact_bundle_name:
feature_15.64.tar.gz -
Read the artifact using the
Artifactclass
>>> from corridor_runtime.artifact import Artifact
>>> artifact = Artifact('feature_15.64.tar.gz')
- Check the input tables and columns used
>>> artifact.inputs
[DataTable(id=5, alias='application_table', name='Application Table', location='AppData.parquet', columns=[DataColumn(id=200, alias='annual_inc', type='double'), DataColumn(id=266, alias='corridor_requested_loan_amount', type='double'), DataColumn(id=283, alias='corridor_application_id', type='string')])]
- Get the execution graph
>>> execution_graph = artifact.execution_graph
>>> execution_graph.nodes
NodeView(('feature_15', 'feature_2', 'feature_5', 'output_table_final_data_1'))
- Get the metadata for
feature_15
>>> execution_graph.nodes['feature_15']['metadata']
{'ref': 'feature_15', 'id': 15, 'created_by': 'xin', 'created_date': '2020-04-23T17:56:52', 'category': 'UserDefined', 'definition_alias': 'debt_capacity', 'data_type': 'Numerical', 'input_features': [{'ref': 'feature_2', 'id': 2}, {'ref': 'feature_5', 'id': 5}], 'name': 'Debt Capacity', 'parent_id': 16, 'platform_entity': 'Application', 'unique_alias': 'debt_capacity__v1__id15', 'version': 1, 'status': 'Approved', 'group': 'Loan Specific', 'colname': 'debt_capacity__v1__id15'}
- Get the object for
feature_15from execution graph
>>> feature = execution_graph.nodes['feature_15']['object']
>>> feature
Feature(id=15, name='Debt Capacity', alias='debt_capacity', version=1, type='Numerical', platform_entity='Application', group='Loan Specific', definition='return application_loan_amount/(max(0.1, annual_income))', colname='debt_capacity__v1__id15')
>>> feature.name
'Debt Capacity'
- To run the artifact with all the objects (assuming
sparkSessionis available asspark)
>>> input_data_dictionary = {'data_table_5': spark.read.parquet('AppTable.parquet')}
>>> runtime_params = {}
>>> output_dictionary, errors = artifact.run(input_data_dictionary, runtime_params, lang_spec='pyspark-dataframe')
>>> output = output_dictionary['final_data']
>>> output.show(5)
+--------------------------------+----------------------+-----------------------------------------+-----------------------+
|application_loan_amount__v1__id2|annual_income__v1__id5|application_id__reserved_xgpajewcuspxexsl|debt_capacity__v1__id15|
+--------------------------------+----------------------+-----------------------------------------+-----------------------+
| 14650.0| 75000.0| 20000018440| 0.19533333333333333|
| 11000.0| 125000.0| 225770004638| 0.088|
| 37000.0| 113536.0| 20000077834| 0.3258878241262683|
| 20000.0| 140000.0| 20000267750| 0.14285714285714285|
| 40000.0| 130000.0| 225769878100| 0.3076923076923077|
+--------------------------------+----------------------+-----------------------------------------+-----------------------+
- To run the artifact after excluding the
feature_15, i.e.,Debt Capacity
>>> input_data_dictionary = {'data_table_5': spark.read.parquet('AppTable.parquet')}
>>> runtime_params = {}
>>> output_dictionary, errors = artifact.run(input_data_dictionary, runtime_params, lang_spec='pyspark-dataframe', exclude=('feature_15', ))
>>> output = output_dictionary['final_data']
>>> output.show(5)
+--------------------------------+----------------------+-----------------------------------------+
|application_loan_amount__v1__id2|annual_income__v1__id5|application_id__reserved_xgpajewcuspxexsl|
+--------------------------------+----------------------+-----------------------------------------+
| 14650.0| 75000.0| 20000018440|
| 11000.0| 125000.0| 225770004638|
| 37000.0| 113536.0| 20000077834|
| 20000.0| 140000.0| 20000267750|
| 40000.0| 130000.0| 225769878100|
+--------------------------------+----------------------+-----------------------------------------+
The exclude option is useful when we want to run and save some objects into our data-lake and use those to run the
final feature or model on a daily/weekly basis, without having to compute the previous objects again and again.
- To run the artifact using data already computed above, and including
feature_15now.
>>> output_dictionary, errors = artifact.run(output_dictionary, runtime_params, lang_spec='pyspark-dataframe', include=('feature_15', ))
>>> output = output_dictionary['final_data']
>>> output.show(5)
+--------------------------------+----------------------+-----------------------------------------+-----------------------+
|application_loan_amount__v1__id2|annual_income__v1__id5|application_id__reserved_xgpajewcuspxexsl|debt_capacity__v1__id15|
+--------------------------------+----------------------+-----------------------------------------+-----------------------+
| 14650.0| 75000.0| 20000018440| 0.19533333333333333|
| 11000.0| 125000.0| 225770004638| 0.088|
| 37000.0| 113536.0| 20000077834| 0.3258878241262683|
| 20000.0| 140000.0| 20000267750| 0.14285714285714285|
| 40000.0| 130000.0| 225769878100| 0.3076923076923077|
+--------------------------------+----------------------+-----------------------------------------+-----------------------+
The include option follows an exclude. include assumes that all the required dependencies (objects) are already computed
and stored in a dataframe called final_data, along with other required inputs in the input data dictionary.