Skip to content

Corridor-Runtime

The artifact generated from the platform is independent of the platform and can run in an isolated runtime environment or production environment. The information stored in the artifact is useful in many cases, where we might want to:

  • check metadata for the objects
  • check the input tables and columns used
  • see the lineage and relationship between the objects
  • run the entire artifact or some components of the artifact to get complete or intermediate results

corridor-runtime is a utility package created by Corridor, which can help us in doing the above mentioned tasks in a very easy manner, without having to worry about extracting the artifact bundle (bundle.tar.gz) or the files inside the bundle.

Note

The corridor-runtime package works on specific artifact format (mentioned in the metadata.json inside the artifact bundle).
A given version of corridor-runtime may not be compatible with older/newer artifacts as it assumes artifact to be structured in some form.

Available options

>>> help(Artifact)

Help on class Artifact in module corridor_runtime.artifact:

class Artifact(builtins.object)
 |  Artifact(artifact_path)
 |  
 |  Methods defined here:
 |  
 |  __del__(self)
 |  
 |  __init__(self, artifact_path)
 |      Initialize self.  See help(type(self)) for accurate signature.
 |  
 |  code(self, lang_spec='pyspark-dataframe', exclude=(), include=())
 |  
 |  run(self, data, runtime_params=(), lang_spec='pyspark-dataframe', exclude=(), include=())
 |      :param exclude: The list of nodes to exclude from the unified artifact.
 |                      Each node is denoted by a unique `ref`
 |      :param include: The list of nodes to include in the artifact.
 |                      The artifact will be create with only the nodes in `include`
 |                      and wrapped inside a `main` function.
 |  
 |  ----------------------------------------------------------------------
 |  Readonly properties defined here:
 |  
 |  execution_graph
 |      :returns: a networkx `dag` with all the nodes from PEExecService for now.
 |                Each node is a networkx graph node with the attributes:
 |                `object`, one of:
 |                - Feature
 |                    attributes: ['id', 'name', 'alias', 'type', 'platform_entity', 'category', 'group', 'version']
 |                - Model
 |                    attributes: ['id', 'name', 'output_alias', 'type', 'platform_entity', 'group', 'version']
 |                `is_model`, True if the node is a Model object, False otherwise
 |  
 |  inputs
 |  
 |  versions
 |  
 |  ----------------------------------------------------------------------
 |  Data descriptors defined here:
 |  
 |  __dict__
 |      dictionary for instance variables (if defined)
 |  
 |  __weakref__
 |      list of weak references to the object (if defined)

Example

The example focuses on a Feature with below details:

  • name: Debt Capacity
  • alias: debt_capacity
  • inputs: application_loan_amount, annual_income
  • definition: return application_loan_amount/(max(0.1, annual_income))
  • artifact_bundle_name: feature_15.64.tar.gz

  • Read the artifact using the Artifact class

>>> from corridor_runtime.artifact import Artifact
>>> artifact = Artifact('feature_15.64.tar.gz')
  • Check the input tables and columns used
>>> artifact.inputs
[DataTable(id=5, alias='application_table', name='Application Table', location='AppData.parquet', columns=[DataColumn(id=200, alias='annual_inc', type='double'), DataColumn(id=266, alias='corridor_requested_loan_amount', type='double'), DataColumn(id=283, alias='corridor_application_id', type='string')])]
  • Get the execution graph
>>> execution_graph = artifact.execution_graph
>>> execution_graph.nodes
NodeView(('feature_15', 'feature_2', 'feature_5', 'output_table_final_data_1'))
  • Get the metadata for feature_15
>>> execution_graph.nodes['feature_15']['metadata']
{'ref': 'feature_15', 'id': 15, 'created_by': 'xin', 'created_date': '2020-04-23T17:56:52', 'category': 'UserDefined', 'definition_alias': 'debt_capacity', 'data_type': 'Numerical', 'input_features': [{'ref': 'feature_2', 'id': 2}, {'ref': 'feature_5', 'id': 5}], 'name': 'Debt Capacity', 'parent_id': 16, 'platform_entity': 'Application', 'unique_alias': 'debt_capacity__v1__id15', 'version': 1, 'status': 'Approved', 'group': 'Loan Specific', 'colname': 'debt_capacity__v1__id15'}
  • Get the object for feature_15 from execution graph
>>> feature = execution_graph.nodes['feature_15']['object']
>>> feature
Feature(id=15, name='Debt Capacity', alias='debt_capacity', version=1, type='Numerical', platform_entity='Application', group='Loan Specific', definition='return application_loan_amount/(max(0.1, annual_income))', colname='debt_capacity__v1__id15')
>>> feature.name
'Debt Capacity'
  • To run the artifact with all the objects (assuming sparkSession is available as spark)
>>> input_data_dictionary = {'data_table_5': spark.read.parquet('AppTable.parquet')}
>>> runtime_params = {}
>>> output_dictionary, errors = artifact.run(input_data_dictionary, runtime_params, lang_spec='pyspark-dataframe')
>>> output = output_dictionary['final_data']
>>> output.show(5)
+--------------------------------+----------------------+-----------------------------------------+-----------------------+
|application_loan_amount__v1__id2|annual_income__v1__id5|application_id__reserved_xgpajewcuspxexsl|debt_capacity__v1__id15|
+--------------------------------+----------------------+-----------------------------------------+-----------------------+
|                         14650.0|               75000.0|                              20000018440|    0.19533333333333333|
|                         11000.0|              125000.0|                             225770004638|                  0.088|
|                         37000.0|              113536.0|                              20000077834|     0.3258878241262683|
|                         20000.0|              140000.0|                              20000267750|    0.14285714285714285|
|                         40000.0|              130000.0|                             225769878100|     0.3076923076923077|
+--------------------------------+----------------------+-----------------------------------------+-----------------------+
  • To run the artifact after excluding the feature_15, i.e., Debt Capacity
>>> input_data_dictionary = {'data_table_5': spark.read.parquet('AppTable.parquet')}
>>> runtime_params = {}
>>> output_dictionary, errors = artifact.run(input_data_dictionary, runtime_params, lang_spec='pyspark-dataframe', exclude=('feature_15', ))
>>> output = output_dictionary['final_data']
>>> output.show(5)
+--------------------------------+----------------------+-----------------------------------------+
|application_loan_amount__v1__id2|annual_income__v1__id5|application_id__reserved_xgpajewcuspxexsl|
+--------------------------------+----------------------+-----------------------------------------+
|                         14650.0|               75000.0|                              20000018440|
|                         11000.0|              125000.0|                             225770004638|
|                         37000.0|              113536.0|                              20000077834|
|                         20000.0|              140000.0|                              20000267750|
|                         40000.0|              130000.0|                             225769878100|
+--------------------------------+----------------------+-----------------------------------------+

The exclude option is useful when we want to run and save some objects into our data-lake and use those to run the final feature or model on a daily/weekly basis, without having to compute the previous objects again and again.

  • To run the artifact using data already computed above, and including feature_15 now.
>>> output_dictionary, errors = artifact.run(output_dictionary, runtime_params, lang_spec='pyspark-dataframe', include=('feature_15', ))
>>> output = output_dictionary['final_data']
>>> output.show(5)
+--------------------------------+----------------------+-----------------------------------------+-----------------------+
|application_loan_amount__v1__id2|annual_income__v1__id5|application_id__reserved_xgpajewcuspxexsl|debt_capacity__v1__id15|
+--------------------------------+----------------------+-----------------------------------------+-----------------------+
|                         14650.0|               75000.0|                              20000018440|    0.19533333333333333|
|                         11000.0|              125000.0|                             225770004638|                  0.088|
|                         37000.0|              113536.0|                              20000077834|     0.3258878241262683|
|                         20000.0|              140000.0|                              20000267750|    0.14285714285714285|
|                         40000.0|              130000.0|                             225769878100|     0.3076923076923077|
+--------------------------------+----------------------+-----------------------------------------+-----------------------+

The include option follows an exclude. include assumes that all the required dependencies (objects) are already computed and stored in a dataframe called final_data, along with other required inputs in the input data dictionary.