diff --git a/.env.example b/.env.example index a4a87bca..b7d0d5b5 100644 --- a/.env.example +++ b/.env.example @@ -1,32 +1,43 @@ # Azure Subscription Variables -WORKSPACE_NAME = '' -RESOURCE_GROUP = '' SUBSCRIPTION_ID = '' LOCATION = '' TENANT_ID = '' +BASE_NAME = '' +SP_APP_ID = '' +SP_APP_SECRET = '' + +# Mock build/release ID for local testing - update ReleaseID each "release" +BUILD_BUILDID = '001' +RELEASE_RELEASEID = '001' # Azure ML Workspace Variables EXPERIMENT_NAME = '' SCRIPT_FOLDER = './' -BLOB_STORE_NAME = '' -# Remote VM Config -REMOTE_VM_NAME = '' -REMOTE_VM_USERNAME = '' -REMOTE_VM_PASSWORD = '' -REMOTE_VM_IP = '' + # AML Compute Cluster Config -AML_CLUSTER_NAME = '' -AML_CLUSTER_VM_SIZE = '' +AML_COMPUTE_CLUSTER_NAME = '' +AML_COMPUTE_CLUSTER_CPU_SKU = '' AML_CLUSTER_MAX_NODES = '' AML_CLUSTER_MIN_NODES = '' AML_CLUSTER_PRIORITY = 'lowpriority' # Training Config -MODEL_NAME = '' -MODEL_VERSION = '' +MODEL_NAME = 'sklearn_regression_model.pkl' +MODEL_VERSION = '1' +TRAIN_SCRIPT_PATH = 'training/train.py' # AML Pipeline Config TRAINING_PIPELINE_NAME = '' PIPELINE_CONDA_PATH = 'aml_config/conda_dependencies.yml' MODEL_PATH = '' +EVALUATE_SCRIPT_PATH = 'evaluate/evaluate_model.py' +REGISTER_SCRIPT_PATH = 'register/register_model.py' +SOURCES_DIR_TRAIN = 'code' + +# These are not mandatory for the core workflow +# Remote VM Config +REMOTE_VM_NAME = '' +REMOTE_VM_USERNAME = '' +REMOTE_VM_PASSWORD = '' +REMOTE_VM_IP = '' # Image config IMAGE_NAME = '' IMAGE_DESCRIPTION = '' diff --git a/.pipelines/azdo-ci-build-train.yml b/.pipelines/azdo-ci-build-train.yml index 09f90909..1b34b892 100644 --- a/.pipelines/azdo-ci-build-train.yml +++ b/.pipelines/azdo-ci-build-train.yml @@ -23,7 +23,7 @@ steps: failOnStderr: 'false' env: SP_APP_SECRET: '$(SP_APP_SECRET)' - displayName: 'Train model using AML with Remote Compute' + displayName: 'Publish Azure Machine Learning Pipeline' enabled: 'true' - task: CopyFiles@2 @@ -32,7 +32,7 @@ steps: SourceFolder: '$(Build.SourcesDirectory)' TargetFolder: '$(Build.ArtifactStagingDirectory)' Contents: | - ml_service/pipelines/?(run_train_pipeline.py|*.json) + ml_service/pipelines/?(run_train_pipeline.py|*.json) code/scoring/** diff --git a/code/evaluate/evaluate_model.py b/code/evaluate/evaluate_model.py index 02e048b6..ec5dc5e0 100644 --- a/code/evaluate/evaluate_model.py +++ b/code/evaluate/evaluate_model.py @@ -24,14 +24,11 @@ POSSIBILITY OF SUCH DAMAGE. """ import os -import json -from azureml.core.model import Model -from azureml.core import Run +from azureml.core import Model, Run import argparse # Get workspace -# ws = Workspace.from_config() run = Run.get_context() exp = run.experiment ws = run.experiment.workspace @@ -39,38 +36,33 @@ parser = argparse.ArgumentParser("evaluate") parser.add_argument( - "--config_suffix", type=str, help="Datetime suffix for json config files" + "--release_id", + type=str, + help="The ID of the release triggering this pipeline run", ) parser.add_argument( - "--json_config", + "--model_name", type=str, - help="Directory to write all the intermediate json configs", + help="Name of the Model", + default="sklearn_regression_model.pkl", ) args = parser.parse_args() -print("Argument 1: %s" % args.config_suffix) -print("Argument 2: %s" % args.json_config) +print("Argument 1: %s" % args.release_id) +print("Argument 2: %s" % args.model_name) +model_name = args.model_name +release_id = args.release_id -if not (args.json_config is None): - os.makedirs(args.json_config, exist_ok=True) - print("%s created" % args.json_config) # Paramaterize the matrics on which the models should be compared # Add golden data set on which all the model performance can be evaluated -# Get the latest run_id -# with open("aml_config/run_id.json") as f: -# config = json.load(f) - -train_run_id_json = "run_id_{}.json".format(args.config_suffix) -train_output_path = os.path.join(args.json_config, train_run_id_json) -with open(train_output_path) as f: - config = json.load(f) - - -new_model_run_id = config["run_id"] # args.train_run_id -experiment_name = config["experiment_name"] -# exp = Experiment(workspace=ws, name=experiment_name) - +all_runs = exp.get_runs( + properties={"release_id": release_id, "run_type": "train"}, + include_children=True + ) +new_model_run = next(all_runs) +new_model_run_id = new_model_run.id +print(f'New Run found with Run ID of: {new_model_run_id}') try: # Get most recently registered model, we assume that @@ -110,16 +102,12 @@ print("This is the first model to be trained, \ thus nothing to evaluate for now") -run_id = {} -run_id["run_id"] = "" + # Writing the run id to /aml_config/run_id.json if promote_new_model: - run_id["run_id"] = new_model_run_id - # register new model - # new_model_run.register_model(model_name='',model_path='outputs/sklearn_regression_model.pkl') - -run_id["experiment_name"] = experiment_name -filename = "run_id_{}.json".format(args.config_suffix) -output_path = os.path.join(args.json_config, filename) -with open(output_path, "w") as outfile: - json.dump(run_id, outfile) + model_path = os.path.join('outputs', model_name) + new_model_run.register_model( + model_name=model_name, + model_path=model_path, + properties={"release_id": release_id}) + print("Registered new model!") diff --git a/code/training/train.py b/code/training/train.py index 2b541615..d703964f 100644 --- a/code/training/train.py +++ b/code/training/train.py @@ -32,17 +32,13 @@ from sklearn.model_selection import train_test_split from sklearn.externals import joblib import numpy as np -import json parser = argparse.ArgumentParser("train") parser.add_argument( - "--config_suffix", type=str, help="Datetime suffix for json config files" -) -parser.add_argument( - "--json_config", + "--release_id", type=str, - help="Directory to write all the intermediate json configs", + help="The ID of the release triggering this pipeline run", ) parser.add_argument( "--model_name", @@ -53,14 +49,11 @@ args = parser.parse_args() -print("Argument 1: %s" % args.config_suffix) -print("Argument 2: %s" % args.json_config) +print("Argument 1: %s" % args.release_id) +print("Argument 2: %s" % args.model_name) model_name = args.model_name - -if not (args.json_config is None): - os.makedirs(args.json_config, exist_ok=True) - print("%s created" % args.json_config) +release_id = args.release_id run = Run.get_context() exp = run.experiment @@ -102,12 +95,8 @@ print("Following files are uploaded ") print(run.get_file_names()) -run_id = {} -run_id["run_id"] = run.id -run_id["experiment_name"] = run.experiment.name -filename = "run_id_{}.json".format(args.config_suffix) -output_path = os.path.join(args.json_config, filename) -with open(output_path, "w") as outfile: - json.dump(run_id, outfile) +# Add properties to identify this specific training run +run.add_properties({"release_id": release_id, "run_type": "train"}) +print(f"added properties: {run.properties}") run.complete() diff --git a/docs/code_description.md b/docs/code_description.md index 8ae6774b..652fa93d 100644 --- a/docs/code_description.md +++ b/docs/code_description.md @@ -27,8 +27,8 @@ ### Code - `code/training/train.py` : a training step of an ML training pipeline. -- `code/evaluate/evaluate_model.py` : an evaluating step of an ML training pipeline. -- `code/evaluate/register_model.py` : registers a new trained model if evaluation shows the new model is more performant than the previous one. +- `code/evaluate/evaluate_model.py` : an evaluating step of an ML training pipeline which registers a new trained model if evaluation shows the new model is more performant than the previous one. +- `code/evaluate/register_model.py` : (LEGACY) registers a new trained model if evaluation shows the new model is more performant than the previous one. ### Scoring - code/scoring/score.py : a scoring script which is about to be packed into a Docker Image along with a model while being deployed to QA/Prod environment. diff --git a/docs/getting_started.md b/docs/getting_started.md index 519fa5ff..0c1b6b22 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -49,6 +49,7 @@ The variable group should contain the following variables: | SUBSCRIPTION_ID | | | TENANT_ID | | | TRAIN_SCRIPT_PATH | training/train.py | +| TRAINING_PIPELINE_NAME | training-pipeline | Mark **SP_APP_SECRET** variable as a secret one. @@ -88,6 +89,7 @@ Check out created resources in the [Azure Portal](portal.azure.com): Alternatively, you can also use a [cleaning pipeline](../environment_setup/iac-remove-environment.yml) that removes resources created for this project or you can just delete a resource group in the [Azure Portal](portal.azure.com). +Once this resource group is created, be sure that the Service Principal you have created has access to this resource group. ### 6. Set up Build Pipeline @@ -127,9 +129,11 @@ Rename the default "Stage 1" to **Invoke Training Pipeline** and make sure that Add a **Command Line Script** step, rename it to **Run Training Pipeline** with the following script: ```bash -docker run -v $(System.DefaultWorkingDirectory)/_ci-build/mlops-pipelines/ml_service/pipelines:/pipelines \ --w=/pipelines -e MODEL_NAME=$MODEL_NAME -e EXPERIMENT_NAME=$EXPERIMENT_NAME \ --e TENANT_ID=$TENANT_ID -e SP_APP_ID=$SP_APP_ID -e SP_APP_SECRET=$(SP_APP_SECRET) \ +docker run -v $(System.DefaultWorkingDirectory)/_ci-build/mlops-pipelines/ml_service/pipelines:/pipelines \ + -w=/pipelines -e MODEL_NAME=$MODEL_NAME -e EXPERIMENT_NAME=$EXPERIMENT_NAME \ + -e TENANT_ID=$TENANT_ID -e SP_APP_ID=$SP_APP_ID -e SP_APP_SECRET=$(SP_APP_SECRET) \ + -e SUBSCRIPTION_ID=$SUBSCRIPTION_ID -e RELEASE_RELEASEID=$RELEASE_RELEASEID \ + -e BUILD_BUILDID=$BUILD_BUILDID -e BASE_NAME=$BASE_NAME \ mcr.microsoft.com/mlops/python:latest python run_train_pipeline.py ``` diff --git a/environment_setup/arm-templates/cloud-environment.json b/environment_setup/arm-templates/cloud-environment.json index e01471d7..590a4aed 100644 --- a/environment_setup/arm-templates/cloud-environment.json +++ b/environment_setup/arm-templates/cloud-environment.json @@ -20,7 +20,8 @@ "southeastasia", "westcentralus", "westeurope", - "westus2" + "westus2", + "centralus" ], "metadata": { "description": "Specifies the location for all resources." diff --git a/environment_setup/iac-create-environment.yml b/environment_setup/iac-create-environment.yml index 6c2495cb..4a930c50 100644 --- a/environment_setup/iac-create-environment.yml +++ b/environment_setup/iac-create-environment.yml @@ -22,7 +22,7 @@ steps: location: $(LOCATION) templateLocation: 'Linked artifact' csmFile: '$(Build.SourcesDirectory)/environment_setup/arm-templates/cloud-environment.json' - overrideParameters: '-baseName $(BASE_NAME)' + overrideParameters: '-baseName $(BASE_NAME) -location $(LOCATION)' deploymentMode: 'Incremental' displayName: 'Deploy MLOps resources to Azure' diff --git a/environment_setup/requirements.txt b/environment_setup/requirements.txt index 8a086c4d..23880c0c 100644 --- a/environment_setup/requirements.txt +++ b/environment_setup/requirements.txt @@ -1,6 +1,5 @@ pytest==4.3.0 requests>=2.22 -azureml>=0.2 azureml-sdk>=1.0 python-dotenv>=0.10.3 flake8 diff --git a/ml_service/__init__.py b/ml_service/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ml_service/pipelines/build_train_pipeline.py b/ml_service/pipelines/build_train_pipeline.py index df5dcdcc..3214a57c 100644 --- a/ml_service/pipelines/build_train_pipeline.py +++ b/ml_service/pipelines/build_train_pipeline.py @@ -1,16 +1,14 @@ from azureml.pipeline.core.graph import PipelineParameter from azureml.pipeline.steps import PythonScriptStep -from azureml.pipeline.core import Pipeline, PipelineData +from azureml.pipeline.core import Pipeline # , PipelineData from azureml.core.runconfig import RunConfiguration, CondaDependencies -from azureml.core import Datastore -import datetime +# from azureml.core import Datastore import os import sys from dotenv import load_dotenv sys.path.append(os.path.abspath("./ml_service/util")) # NOQA: E402 from workspace import get_workspace from attach_compute import get_compute -import json def main(): @@ -24,10 +22,14 @@ def main(): sources_directory_train = os.environ.get("SOURCES_DIR_TRAIN") train_script_path = os.environ.get("TRAIN_SCRIPT_PATH") evaluate_script_path = os.environ.get("EVALUATE_SCRIPT_PATH") - register_script_path = os.environ.get("REGISTER_SCRIPT_PATH") - vm_size_cpu = os.environ.get("AML_COMPUTE_CLUSTER_CPU_SKU") - compute_name_cpu = os.environ.get("AML_COMPUTE_CLUSTER_NAME") + # register_script_path = os.environ.get("REGISTER_SCRIPT_PATH") + vm_size = os.environ.get("AML_COMPUTE_CLUSTER_CPU_SKU") + compute_name = os.environ.get("AML_COMPUTE_CLUSTER_NAME") model_name = os.environ.get("MODEL_NAME") + build_id = os.environ.get("BUILD_BUILDID") + pipeline_name = os.environ.get("TRAINING_PIPELINE_NAME") + + print(app_secret) # Get Azure machine learning workspace aml_workspace = get_workspace( @@ -40,12 +42,12 @@ def main(): print(aml_workspace) # Get Azure machine learning cluster - aml_compute_cpu = get_compute( + aml_compute = get_compute( aml_workspace, - compute_name_cpu, - vm_size_cpu) - if aml_compute_cpu is not None: - print(aml_compute_cpu) + compute_name, + vm_size) + if aml_compute is not None: + print(aml_compute) run_config = RunConfiguration(conda_dependencies=CondaDependencies.create( conda_packages=['numpy', 'pandas', @@ -58,23 +60,20 @@ def main(): model_name = PipelineParameter( name="model_name", default_value=model_name) - def_blob_store = Datastore(aml_workspace, "workspaceblobstore") - jsonconfigs = PipelineData("jsonconfigs", datastore=def_blob_store) - config_suffix = datetime.datetime.now().strftime("%Y%m%d%H") + release_id = PipelineParameter( + name="release_id", default_value="0" + ) train_step = PythonScriptStep( name="Train Model", script_name=train_script_path, - compute_target=aml_compute_cpu, + compute_target=aml_compute, source_directory=sources_directory_train, arguments=[ - "--config_suffix", config_suffix, - "--json_config", jsonconfigs, + "--release_id", release_id, "--model_name", model_name, ], runconfig=run_config, - # inputs=[jsonconfigs], - outputs=[jsonconfigs], allow_reuse=False, ) print("Step Train created") @@ -82,52 +81,49 @@ def main(): evaluate_step = PythonScriptStep( name="Evaluate Model ", script_name=evaluate_script_path, - compute_target=aml_compute_cpu, + compute_target=aml_compute, source_directory=sources_directory_train, arguments=[ - "--config_suffix", config_suffix, - "--json_config", jsonconfigs, + "--release_id", release_id, + "--model_name", model_name, ], runconfig=run_config, - inputs=[jsonconfigs], - # outputs=[jsonconfigs], allow_reuse=False, ) print("Step Evaluate created") - register_model_step = PythonScriptStep( - name="Register New Trained Model", - script_name=register_script_path, - compute_target=aml_compute_cpu, - source_directory=sources_directory_train, - arguments=[ - "--config_suffix", config_suffix, - "--json_config", jsonconfigs, - "--model_name", model_name, - ], - runconfig=run_config, - inputs=[jsonconfigs], - # outputs=[jsonconfigs], - allow_reuse=False, - ) - print("Step register model created") + # Currently, the Evaluate step will automatically register + # the model if it performs better. This step is based on a + # previous version of the repo which utilized JSON files to + # track evaluation results. + + # register_model_step = PythonScriptStep( + # name="Register New Trained Model", + # script_name=register_script_path, + # compute_target=aml_compute, + # source_directory=sources_directory_train, + # arguments=[ + # "--release_id", release_id, + # "--model_name", model_name, + # ], + # runconfig=run_config, + # allow_reuse=False, + # ) + # print("Step register model created") evaluate_step.run_after(train_step) - register_model_step.run_after(evaluate_step) - steps = [register_model_step] + # register_model_step.run_after(evaluate_step) + steps = [evaluate_step] train_pipeline = Pipeline(workspace=aml_workspace, steps=steps) train_pipeline.validate() published_pipeline = train_pipeline.publish( - name="training-pipeline", - description="Model training/retraining pipeline" + name=pipeline_name, + description="Model training/retraining pipeline", + version=build_id ) - - train_pipeline_json = {} - train_pipeline_json["rest_endpoint"] = published_pipeline.endpoint - json_file_path = "ml_service/pipelines/train_pipeline.json" - with open(json_file_path, "w") as outfile: - json.dump(train_pipeline_json, outfile) + print(f'Published pipeline: {published_pipeline.name}') + print(f'for build {published_pipeline.version}') if __name__ == '__main__': diff --git a/ml_service/pipelines/run_train_pipeline.py b/ml_service/pipelines/run_train_pipeline.py index a2d1e68f..11252a88 100644 --- a/ml_service/pipelines/run_train_pipeline.py +++ b/ml_service/pipelines/run_train_pipeline.py @@ -1,41 +1,62 @@ -import sys import os -import json -import requests -from azure.common.credentials import ServicePrincipalCredentials - - -tenant_id = os.environ.get("TENANT_ID") -app_id = os.environ.get("SP_APP_ID") -app_secret = os.environ.get("SP_APP_SECRET") - -try: - with open("train_pipeline.json") as f: - train_pipeline_json = json.load(f) -except Exception: - print("No pipeline json found") - sys.exit(0) - -experiment_name = os.environ.get("EXPERIMENT_NAME") -model_name = os.environ.get("MODEL_NAME") - -credentials = ServicePrincipalCredentials( - client_id=app_id, - secret=app_secret, - tenant=tenant_id -) - -token = credentials.token['access_token'] -print("token", token) -auth_header = {"Authorization": "Bearer " + token} - -rest_endpoint = train_pipeline_json["rest_endpoint"] - -response = requests.post( - rest_endpoint, headers=auth_header, - json={"ExperimentName": experiment_name, - "ParameterAssignments": {"model_name": model_name}} -) - -run_id = response.json()["Id"] -print("Pipeline run initiated ", run_id) +from azureml.pipeline.core import PublishedPipeline +from azureml.core import Workspace +from azureml.core.authentication import ServicePrincipalAuthentication +from dotenv import load_dotenv + + +def main(): + load_dotenv() + workspace_name = os.environ.get("BASE_NAME")+"-AML-WS" + resource_group = os.environ.get("BASE_NAME")+"-AML-RG" + subscription_id = os.environ.get("SUBSCRIPTION_ID") + tenant_id = os.environ.get("TENANT_ID") + experiment_name = os.environ.get("EXPERIMENT_NAME") + model_name = os.environ.get("MODEL_NAME") + app_id = os.environ.get('SP_APP_ID') + app_secret = os.environ.get('SP_APP_SECRET') + release_id = os.environ.get('RELEASE_RELEASEID') + build_id = os.environ.get('BUILD_BUILDID') + + service_principal = ServicePrincipalAuthentication( + tenant_id=tenant_id, + service_principal_id=app_id, + service_principal_password=app_secret) + + aml_workspace = Workspace.get( + name=workspace_name, + subscription_id=subscription_id, + resource_group=resource_group, + auth=service_principal + ) + + # Find the pipeline that was published by the specified build ID + pipelines = PublishedPipeline.list(aml_workspace) + matched_pipes = [] + + for p in pipelines: + if p.version == build_id: + matched_pipes.append(p) + + if(len(matched_pipes) > 1): + published_pipeline = None + raise Exception(f"Multiple active pipelines are published for build {build_id}.") # NOQA: E501 + elif(len(matched_pipes) == 0): + published_pipeline = None + raise KeyError(f"Unable to find a published pipeline for this build {build_id}") # NOQA: E501 + else: + published_pipeline = matched_pipes[0] + + pipeline_parameters = {"model_name": model_name, "release_id": release_id} + + response = published_pipeline.submit( + aml_workspace, + experiment_name, + pipeline_parameters) + + run_id = response.id + print("Pipeline run initiated ", run_id) + + +if __name__ == "__main__": + main() diff --git a/ml_service/util/attach_compute.py b/ml_service/util/attach_compute.py index ff9d0ebd..7a34cd38 100644 --- a/ml_service/util/attach_compute.py +++ b/ml_service/util/attach_compute.py @@ -41,6 +41,7 @@ def get_compute( min_node_count=None, timeout_in_minutes=10) return compute_target - except ComputeTargetException: + except ComputeTargetException as e: + print(e) print('An error occurred trying to provision compute.') exit()