Job Specific Configurations
Corridor provides an option for the users to be able to run their jobs using a custom spark session, by providing
the configurations in the job details page in Job Configs section.
The configurations could be to create a spark session which can:
- Use specific versions of some libraries, if their model needs it.
- Provide specific queue for job.
- Run job with defined number of cores, executor memory, driver memory etc.
Handler class
The user would need to define a CustomSparkSessionHandler which would inherit Corridor's base approval handler class:
corridor_api.config.handlers.SparkSessionlHandler
The logic for spark session creation would define the following method inside CustomSparkSessionHandler.
create_spark_session(self, existing_session=None, job=None, report=None), where:
- existing_session: The existing session that is active. If no session is active, it will be `None`
- job : The `corridor` Job object that is currently running. This is optional, and can be None (For example in "Table Registry > Fetch columns")
- report : The report class that is currently running.
Example
The example focuses on running a job using specific version of scikit-learn, which has been used by the user for his/her
model, but that version of scikit-learn is not present on the spark cluster.
The user can provide the library versions (assuming in yaml format).
This handler does the following:
- Stop the existing spark session
- create a conda environment using the job configs
- bundle the conda environment
- provide the bundled conda environment during spark session creation (conda-pack)
- return the new spark session
import hashlib
import os
import subprocess
import time
import fs.tempfs
from corridor_api.config.handlers import SparkSessionlHandler
class CustomSparkSessionHandler(SparkSessionlHandler):
def stop_spark_session(self, spark):
# logic to stop the existing spark session
pass
def create_spark_session(self):
# logic to create the new spark session
pass
def create_spark_session(self, existing_spark_session=None, job=None, report=None):
"""
This function should return the spark session that should be used for a job/report.
NOTE: There are internal tasks that the system may create which are not associated to a job or reports.
That also should be handled here
:param existing_session: The existing session that is active.
If no session is active, it will be `None`
:param job: The `corridor` Job object that is currently running.
This is optional, and can be None (For example in "Table Registry > Fetch columns")
:param report: The report class that is currently running.
"""
if job is None or job.configs in (None, ''):
if existing_session is not None:
return existing_session
# create default spark session
spark = pyspark.sql.SparkSession.builder.getOrCreate()
return spark
with fs.tempfs.TempFS() as tmp_fs:
# ***************************************************
# CREATE CONDA PACK USING THE CONFIGS
# ***************************************************
temp_dir_path = tmp_fs.getospath("/").decode("utf-8")
job_configs = job.configs
env_file = os.path.join(temp_dir_path, 'environment.yml')
with open(env_file, "w") as f:
f.write(job.configs)
temp_env_path = os.path.join(temp_dir_path, 'conda_env')
env_bundle_name = hashlib.md5(job.configs.strip().encode()).hexdigest()
env_bundle_full_path = os.path.join('/opt/corridor/sim_envs/', f'{env_bundle_name}.tar.gz')
if not os.path.exists(env_bundle_full_path):
command = f'/opt/anaconda3/bin/conda env create -p {temp_env_path} -f {env_file}'
proc = subprocess.run(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
time.sleep(1)
# ***************************************************
# BUNDLE THE CONDA ENV AND SAVE TO NAS STORAGE
# ***************************************************
command = f'/opt/anaconda3/bin/conda pack -p {temp_env_path} -o {env_bundle_full_path}'
proc = subprocess.run(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession, functions as F, types as T
# stop any existing running spark sessions
if existing_session is not None:
try:
self.stop_spark_session(existing_session)
except Exception as e:
print(
f'Could not stop existing session due error: {e}.'
'The new spark session might not get created properly.'
)
return self.create_spark_session()
Configurations
Spark Session handler related configurations need to be set in api_config.py along with other configurations
(assuming the CustomSparkSessionHandler class is defined in the file custom_spark_session_handler.py).
SPARK_SESSION_HANDLER = 'custom_spark_session_handler.CustomSparkSessionHandler'