Skip to content

Commit

Permalink
WIP add basics of RaySGD and MLflow
Browse files Browse the repository at this point in the history
  • Loading branch information
adbreind committed Feb 24, 2021
1 parent 8836483 commit ce81c8d
Show file tree
Hide file tree
Showing 4 changed files with 357 additions and 5 deletions.
2 changes: 1 addition & 1 deletion 04-Lab-Modeling.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.0"
"version": "3.7.0"
}
},
"nbformat": 4,
Expand Down
6 changes: 2 additions & 4 deletions 07-Scoring-Orchestration.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@
"\n",
"Naturally, a project's own pitch is not a neutral argument to use that tool -- and I'm not suggesting you take it as such.\n",
"\n",
"It is, however, well worth consideration in your system design.\n",
"\n",
"## Q&A"
"It is, however, well worth consideration in your system design."
]
},
{
Expand All @@ -113,7 +111,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.0"
"version": "3.7.0"
}
},
"nbformat": 4,
Expand Down
352 changes: 352 additions & 0 deletions 08-RaySGD-MLflow.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,352 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "dominican-cradle",
"metadata": {},
"outputs": [],
"source": [
"import tensorflow as tf\n",
"from tensorflow.keras.models import Sequential\n",
"from tensorflow.keras.layers import Dense\n",
"import numpy as np\n",
"\n",
"import ray\n",
"from ray import tune\n",
"from ray.util.sgd.tf.tf_trainer import TFTrainer, TFTrainable\n",
"\n",
"NUM_TRAIN_SAMPLES = 1000\n",
"NUM_TEST_SAMPLES = 400\n",
"\n",
"def create_config(batch_size):\n",
" return {\n",
" # todo: batch size needs to scale with # of workers\n",
" \"batch_size\": batch_size,\n",
" \"fit_config\": {\n",
" \"steps_per_epoch\": NUM_TRAIN_SAMPLES // batch_size\n",
" },\n",
" \"evaluate_config\": {\n",
" \"steps\": NUM_TEST_SAMPLES // batch_size,\n",
" }\n",
" }\n",
"\n",
"\n",
"def linear_dataset(a=2, size=1000):\n",
" x = np.random.rand(size)\n",
" y = x / 2\n",
"\n",
" x = x.reshape((-1, 1))\n",
" y = y.reshape((-1, 1))\n",
"\n",
" return x, y\n",
"\n",
"def simple_dataset(config):\n",
" batch_size = config[\"batch_size\"]\n",
" x_train, y_train = linear_dataset(size=NUM_TRAIN_SAMPLES)\n",
" x_test, y_test = linear_dataset(size=NUM_TEST_SAMPLES)\n",
"\n",
" train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))\n",
" test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))\n",
" train_dataset = train_dataset.shuffle(NUM_TRAIN_SAMPLES).repeat().batch(\n",
" batch_size)\n",
" test_dataset = test_dataset.repeat().batch(batch_size)\n",
"\n",
" return train_dataset, test_dataset\n",
"\n",
"\n",
"def simple_model(config):\n",
" model = Sequential([Dense(10, input_shape=(1, )), Dense(1)])\n",
"\n",
" model.compile(\n",
" optimizer=\"sgd\",\n",
" loss=\"mean_squared_error\",\n",
" metrics=[\"mean_squared_error\"])\n",
"\n",
" return model\n",
"\n",
"\n",
"def train_example(num_replicas=1, batch_size=128, use_gpu=False):\n",
" trainer = TFTrainer(\n",
" model_creator=simple_model,\n",
" data_creator=simple_dataset,\n",
" num_replicas=num_replicas,\n",
" use_gpu=use_gpu,\n",
" verbose=True,\n",
" config=create_config(batch_size))\n",
"\n",
" # model baseline performance\n",
" start_stats = trainer.validate()\n",
" print(start_stats)\n",
"\n",
" # train for 2 epochs\n",
" trainer.train()\n",
" trainer.train()\n",
"\n",
" # model performance after training (should improve)\n",
" end_stats = trainer.validate()\n",
" print(end_stats)\n",
"\n",
" # sanity check that training worked\n",
" dloss = end_stats[\"validation_loss\"] - start_stats[\"validation_loss\"]\n",
" dmse = (end_stats[\"validation_mean_squared_error\"] -\n",
" start_stats[\"validation_mean_squared_error\"])\n",
" print(f\"dLoss: {dloss}, dMSE: {dmse}\")\n",
"\n",
" if dloss > 0 or dmse > 0:\n",
" print(\"training sanity check failed. loss increased!\")\n",
" else:\n",
" print(\"success!\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "reverse-official",
"metadata": {},
"outputs": [],
"source": [
"ray.init()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "authentic-spirituality",
"metadata": {},
"outputs": [],
"source": [
"train_example()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ongoing-catch",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "victorian-relaxation",
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"from sklearn.model_selection import train_test_split\n",
"\n",
"def diamonds_dataset(config):\n",
" batch_size = config[\"batch_size\"]\n",
" df = pd.read_csv('data/diamonds.csv')\n",
" df.drop(df.columns[0], axis=1, inplace=True)\n",
" df = pd.get_dummies(df, prefix=['cut_', 'color_', 'clarity_'])\n",
" y = df.price.to_numpy()\n",
" X = df.drop(columns=['price']).to_numpy()\n",
" train_size = 40_000\n",
" X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=train_size)\n",
" \n",
" train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))\n",
" test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))\n",
" train_dataset = train_dataset.shuffle(len(X_train)).repeat().batch(\n",
" batch_size)\n",
" test_dataset = test_dataset.repeat().batch(batch_size)\n",
"\n",
" return train_dataset, test_dataset"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "recorded-folder",
"metadata": {},
"outputs": [],
"source": [
"def diamonds_simple_model(config):\n",
" model = Sequential([Dense(30, input_shape=(26, ), activation='relu'), Dense(1)])\n",
"\n",
" model.compile(\n",
" optimizer=\"adam\",\n",
" loss=\"mean_squared_error\",\n",
" metrics=[\"mean_squared_error\"])\n",
"\n",
" return model"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "controlled-pierce",
"metadata": {},
"outputs": [],
"source": [
"def create_diamonds_config(batch_size):\n",
" return {\n",
" \"batch_size\": batch_size,\n",
" \"fit_config\": {\n",
" \"steps_per_epoch\": 40000 // batch_size\n",
" },\n",
" \"evaluate_config\": {\n",
" \"steps\": 13940 // batch_size,\n",
" }\n",
" }"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "present-welsh",
"metadata": {},
"outputs": [],
"source": [
"def train_diamonds(num_replicas=1, batch_size=128, use_gpu=False):\n",
" trainer = TFTrainer(\n",
" model_creator=diamonds_simple_model,\n",
" data_creator=diamonds_dataset,\n",
" num_replicas=num_replicas,\n",
" use_gpu=use_gpu,\n",
" verbose=False,\n",
" config=create_diamonds_config(batch_size))\n",
"\n",
" # model baseline performance\n",
" start_stats = trainer.validate()\n",
" print(start_stats)\n",
"\n",
" for i in range(32):\n",
" trainer.train()\n",
"\n",
" # model performance after training (should improve)\n",
" end_stats = trainer.validate()\n",
" print(end_stats)\n",
"\n",
" # sanity check that training worked\n",
" dloss = end_stats[\"validation_loss\"] - start_stats[\"validation_loss\"]\n",
" dmse = (end_stats[\"validation_mean_squared_error\"] -\n",
" start_stats[\"validation_mean_squared_error\"])\n",
" print(f\"dLoss: {dloss}, dMSE: {dmse}\")\n",
"\n",
" if dloss > 0 or dmse > 0:\n",
" print(\"training sanity check failed. loss increased!\")\n",
" else:\n",
" print(\"success!\")\n",
" \n",
"train_diamonds()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "automated-disposition",
"metadata": {},
"outputs": [],
"source": [
"import mlflow\n",
"\n",
"mlflow.create_experiment(\"Diamonds RaySGD\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "innovative-granny",
"metadata": {},
"outputs": [],
"source": [
"from mlflow.tracking import MlflowClient\n",
"client = MlflowClient()\n",
"experiments = client.list_experiments() # returns a list of mlflow.entities.Experiment\n",
"experiments"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "copyrighted-process",
"metadata": {},
"outputs": [],
"source": [
"run = client.create_run(experiments[0].experiment_id) # returns mlflow.entities.Run\n",
"client.log_param(run.info.run_id, \"hello\", \"world\")\n",
"client.set_terminated(run.info.run_id)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "unknown-control",
"metadata": {},
"outputs": [],
"source": [
"def train_diamonds_mlflow(num_replicas=1, batch_size=128, use_gpu=False):\n",
" trainer = TFTrainer(\n",
" model_creator=diamonds_simple_model,\n",
" data_creator=diamonds_dataset,\n",
" num_replicas=num_replicas,\n",
" use_gpu=use_gpu,\n",
" verbose=False,\n",
" config=create_diamonds_config(batch_size))\n",
"\n",
" # model baseline performance\n",
" start_stats = trainer.validate()\n",
" print(start_stats)\n",
"\n",
" ml_run = client.create_run(experiments[0].experiment_id)\n",
"\n",
" for i in range(32):\n",
" train_stats = trainer.train()\n",
" val_stats = trainer.validate() \n",
" client.log_metric(ml_run.info.run_id, \"validation_loss\", val_stats[\"validation_loss\"]) \n",
" client.log_metric(ml_run.info.run_id, \"training_loss\", train_stats[\"train_loss\"])\n",
" \n",
" client.set_terminated(ml_run.info.run_id)\n",
"\n",
" # model performance after training (should improve)\n",
" end_stats = trainer.validate()\n",
" print(end_stats)\n",
"\n",
" # sanity check that training worked\n",
" dloss = end_stats[\"validation_loss\"] - start_stats[\"validation_loss\"]\n",
" dmse = (end_stats[\"validation_mean_squared_error\"] -\n",
" start_stats[\"validation_mean_squared_error\"])\n",
" print(f\"dLoss: {dloss}, dMSE: {dmse}\")\n",
"\n",
" if dloss > 0 or dmse > 0:\n",
" print(\"training sanity check failed. loss increased!\")\n",
" else:\n",
" print(\"success!\")\n",
" \n",
"train_diamonds_mlflow()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "female-newsletter",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.0"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
2 changes: 2 additions & 0 deletions binder/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ dependencies:
- ray[rllib]
- tensorflow
- pyspark
- mlflow

0 comments on commit ce81c8d

Please sign in to comment.