Skip to content

Job Specific Configurations

Corridor provides an option for the users to be able to run their jobs using a custom spark session, by providing the configurations in the job details page in Job Configs section.

The configurations could be to create a spark session which can:

  • Use specific versions of some libraries, if their model needs it.
  • Provide specific queue for job.
  • Run job with defined number of cores, executor memory, driver memory etc.

Handler class

The user would need to define a CustomSparkSessionHandler which would inherit Corridor's base approval handler class: corridor_api.config.handlers.SparkSessionlHandler

The logic for spark session creation would define the following method inside CustomSparkSessionHandler.

  • create_spark_session(self, existing_session=None, job=None, report=None), where:
- existing_session: The existing session that is active. If no session is active, it will be `None`
- job             : The `corridor` Job object that is currently running. This is optional, and can be None (For example in "Table Registry > Fetch columns")
- report          : The report class that is currently running.

Example

The example focuses on running a job using specific version of scikit-learn, which has been used by the user for his/her model, but that version of scikit-learn is not present on the spark cluster. The user can provide the library versions (assuming in yaml format).

This handler does the following:

  • Stop the existing spark session
  • create a conda environment using the job configs
  • bundle the conda environment
  • provide the bundled conda environment during spark session creation (conda-pack)
  • return the new spark session
import hashlib
import os
import subprocess
import time

import fs.tempfs

from corridor_api.config.handlers import SparkSessionlHandler


class CustomSparkSessionHandler(SparkSessionlHandler):

    def stop_spark_session(self, spark):
        # logic to stop the existing spark session
        pass

    def create_spark_session(self):
        # logic to create the new spark session
        pass

    def create_spark_session(self, existing_spark_session=None, job=None, report=None):
        """
        This function should return the spark session that should be used for a job/report.
        NOTE: There are internal tasks that the system may create which are not associated to a job or reports.
              That also should be handled here

        :param existing_session: The existing session that is active.
                                 If no session is active, it will be `None`
        :param job:              The `corridor` Job object that is currently running.
                                 This is optional, and can be None (For example in "Table Registry > Fetch columns")
        :param report:           The report class that is currently running.
        """

        if job is None or job.configs in (None, ''):
            if existing_session is not None:
                return existing_session

            # create default spark session
            spark = pyspark.sql.SparkSession.builder.getOrCreate()
            return spark

         with fs.tempfs.TempFS() as tmp_fs:
            # ***************************************************
            #       CREATE CONDA PACK USING THE CONFIGS
            # ***************************************************

            temp_dir_path = tmp_fs.getospath("/").decode("utf-8")
            job_configs = job.configs

            env_file = os.path.join(temp_dir_path, 'environment.yml')
            with open(env_file, "w") as f:
                f.write(job.configs)

            temp_env_path = os.path.join(temp_dir_path, 'conda_env')
            env_bundle_name = hashlib.md5(job.configs.strip().encode()).hexdigest()
            env_bundle_full_path = os.path.join('/opt/corridor/sim_envs/', f'{env_bundle_name}.tar.gz')

            if not os.path.exists(env_bundle_full_path):
                command = f'/opt/anaconda3/bin/conda env create -p {temp_env_path} -f {env_file}'
                proc = subprocess.run(
                    command,
                    shell=True,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    universal_newlines=True,
                )
                time.sleep(1)

                # ***************************************************
                #    BUNDLE THE CONDA ENV AND SAVE TO NAS STORAGE
                # ***************************************************

                command = f'/opt/anaconda3/bin/conda pack -p {temp_env_path} -o {env_bundle_full_path}'
                proc = subprocess.run(
                    command,
                    shell=True,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    universal_newlines=True,
                )

            import findspark
            findspark.init()
            import pyspark
            from pyspark.sql import SparkSession, functions as F, types as T

            # stop any existing running spark sessions
            if existing_session is not None:
                try:
                    self.stop_spark_session(existing_session)
                except Exception as e:
                    print(
                        f'Could not stop existing session due error: {e}.'
                        'The new spark session might not get created properly.'
                    )

            return self.create_spark_session()

Configurations

Spark Session handler related configurations need to be set in api_config.py along with other configurations (assuming the CustomSparkSessionHandler class is defined in the file custom_spark_session_handler.py).

SPARK_SESSION_HANDLER = 'custom_spark_session_handler.CustomSparkSessionHandler'