Skip to content

Commit

Permalink
Cleanup o16n notebook
Browse files Browse the repository at this point in the history
  • Loading branch information
loomlike committed Sep 20, 2020
1 parent ed707e3 commit 38c0c81
Showing 1 changed file with 53 additions and 113 deletions.
166 changes: 53 additions & 113 deletions examples/05_operationalize/als_movie_o16n.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -108,23 +108,6 @@
"print(\"Azure SDK version:\", azureml.core.VERSION)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In JupyterHub, environment variables defined in `.../etc/conda/activate.d` may not be activated. If so, uncomment and run the following cell to set PySpark environment variables. Make sure your conda environment path is `/anaconda/envs/reco_pyspark` or change the paths in the script."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# os.environ[\"PYSPARK_PYTHON\"]=\"/anaconda/envs/reco_pyspark/bin/python\"\n",
"# os.environ[\"PYSPARK_DRIVER_PYTHON\"]=\"/anaconda/envs/reco_pyspark/bin/python\""
]
},
{
"cell_type": "code",
"execution_count": 3,
Expand Down Expand Up @@ -878,10 +861,7 @@
"source": [
"### 3.2 Configure Azure Machine Learning\n",
"\n",
"Next, Azure Machine Learning Service is used to create a model scoring image and deploy it to Azure Kubernetes Service as a scalable containerized service. To achieve this, a **scoring script** and an **environment config** should be created. The following shows the content of the two files. \n",
"\n",
"In the scoring script, we make a call to Cosmos DB to lookup the top 10 movies to recommend given an input User ID.\n",
"When we want to get more than 10 movies, we use the registered ALS model to get the recommendation in real time."
"Next, Azure Machine Learning Service is used to create a model scoring image and deploy it to Azure Kubernetes Service as a scalable containerized service. To achieve this, a **scoring script** should be created. In the script, we make a call to Cosmos DB to lookup the top 10 movies to recommend given an input User ID."
]
},
{
Expand All @@ -892,57 +872,31 @@
"source": [
"score_sparkml = \"\"\"\n",
"import json\n",
"import os\n",
"import pydocumentdb.document_client as document_client\n",
"from pyspark.ml.recommendation import ALSModel\n",
"\n",
"\n",
"def init(local=False):\n",
" global client, collection, model\n",
" global client, collection\n",
" try:\n",
" # Pointer to the recommendation cache\n",
" client = document_client.DocumentClient(\n",
" '{endpoint}',\n",
" dict(masterKey='{key}')\n",
" )\n",
" client = document_client.DocumentClient('{endpoint}', dict(masterKey='{key}'))\n",
" collection = client.ReadCollection(collection_link='dbs/{database}/colls/{collection}')\n",
" \n",
" # Load ALS model\n",
" model_path = os.path.join(os.getenv('AZUREML_MODEL_DIR'), '{model_name}')\n",
" model = ALSModel.load(model_path)\n",
" except Exception as e:\n",
" collection = e\n",
"\n",
"def run(input_json):\n",
" try:\n",
" # Query them in SQL\n",
" js = json.loads(json.loads(input_json)[0])\n",
" id = str(js['id'])\n",
" k = js.get('k', 10)\n",
" if k > 10:\n",
" # Use the model to get recommendation\n",
" recs = model.recommendForAllUsers(k)\n",
" recs_topk = recs.withColumn('id', recs['{userCol}'].cast(\"string\")).select(\n",
" 'id', \"recommendations.\" + '{itemCol}'\n",
" )\n",
" result = recs_topk[recs_topk.id==id].collect()[0].asDict()\n",
" else:\n",
" # Use the cached recommendation results \n",
" query = dict(query='SELECT * FROM c WHERE c.id = \"' + id + '\"')\n",
" options = dict(partitionKey=id)\n",
" document_link = 'dbs/{database}/colls/{collection}/docs/' + id\n",
" result = client.ReadDocument(document_link, options)\n",
" result['{itemCol}'] = result['{itemCol}'][:k]\n",
" id = str(json.loads(json.loads(input_json)[0])['id'])\n",
" query = dict(query='SELECT * FROM c WHERE c.id = \"' + id +'\"')\n",
" options = dict(partitionKey=str(id))\n",
" document_link = 'dbs/{database}/colls/{collection}/docs/' + id\n",
" result = client.ReadDocument(document_link, options); \n",
" except Exception as e:\n",
" result = str(e)\n",
" return json.dumps(str(result))\n",
"\"\"\".format(key=dbsecrets['Masterkey'], \n",
" endpoint=dbsecrets['Endpoint'], \n",
" database=dbsecrets['Database'], \n",
" collection=dbsecrets['Collection'],\n",
" userCol=userCol,\n",
" itemCol=itemCol,\n",
" model_name=model_name)\n",
" collection=dbsecrets['Collection'])\n",
"\n",
"# test validity of python string\n",
"exec(score_sparkml)\n",
Expand Down Expand Up @@ -1192,68 +1146,54 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"When we want to get more recommendation results than `TOP_K`, the service uses the deployed model directly instead of querying the DB and returns the results."
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{\n",
" \"id\": \"496\",\n",
" \"MovieId\": [\n",
" 320,\n",
" 1589,\n",
" 262,\n",
" 1344,\n",
" 958,\n",
" 889,\n",
" 1368,\n",
" 645,\n",
" 919,\n",
" 1137,\n",
" 137,\n",
" 1449,\n",
" 1056,\n",
" 844,\n",
" 1143,\n",
" 1495,\n",
" 1136,\n",
" 92,\n",
" 45,\n",
" 529\n",
" ]\n",
"}\n",
"Full run took 3.93 seconds\n"
]
}
],
"source": [
"# Get a recommendation of 20 movies\n",
"input_data = '[\"{\\\\\"id\\\\\":\\\\\"496\\\\\",\\\\\"k\\\\\":20}\"]'.encode()\n",
"## Appendix - Realtime scoring with AzureML\n",
"\n",
"req = urllib.request.Request(scoring_url, data=input_data)\n",
"req.add_header(\"Authorization\",\"Bearer {}\".format(service_key))\n",
"req.add_header(\"Content-Type\",\"application/json\")\n",
"In the previous cells, we utilized Cosmos DB to cache the recommendation results for realtime serving. Alternatively, we can generate recommendation results on demand by using the model we deployed. Following scripts load the registered model and use it for recommendation:\n",
"\n",
"with Timer() as t: \n",
" with urllib.request.urlopen(req) as result:\n",
" res = result.read()\n",
" resj = json.loads(\n",
" # Cleanup to parse into a json object\n",
" res.decode(\"utf-8\")\n",
" .replace(\"\\\\\", \"\")\n",
" .replace('\"', \"\")\n",
" .replace(\"'\", '\"')\n",
"* *score_sparkml.py*\n",
" ```\n",
" import json\n",
" import os\n",
" from pyspark.ml.recommendation import ALSModel\n",
"\n",
" # Note, set `model_name`, `userCol`, and `itemCol` defined earlier.\n",
" model_name = \"mvl-als-reco.mml\"\n",
" userCol = \"UserId\"\n",
" itemCol = \"MovieId\"\n",
"\n",
" def init(local=False):\n",
" global model\n",
"\n",
" # Load ALS model.\n",
" model_path = os.path.join(os.getenv('AZUREML_MODEL_DIR'), model_name)\n",
" model = ALSModel.load(model_path)\n",
"\n",
" def run(input_json):\n",
" js = json.loads(json.loads(input_json)[0])\n",
" id = str(js['id'])\n",
" k = js.get('k', 10)\n",
"\n",
" # Use the model to get recommendation.\n",
" recs = model.recommendForAllUsers(k)\n",
" recs_topk = recs.withColumn('id', recs[userCol].cast(\"string\")).select(\n",
" 'id', \"recommendations.\" + itemCol\n",
" )\n",
" print(json.dumps(resj, indent=4))\n",
" result = recs_topk[recs_topk.id==id].collect()[0].asDict()\n",
"\n",
" return json.dumps(str(result))\n",
" ```\n",
"\n",
"* Call the AKS model service\n",
" ```\n",
" # Get a recommendation of 10 movies\n",
" input_data = '[\"{\\\\\"id\\\\\":\\\\\"496\\\\\",\\\\\"k\\\\\":10}\"]'.encode()\n",
"\n",
" req = urllib.request.Request(scoring_url, data=input_data)\n",
" req.add_header(\"Authorization\",\"Bearer {}\".format(service_key))\n",
" req.add_header(\"Content-Type\",\"application/json\")\n",
" \n",
"print(\"Full run took %.2f seconds\" % t.interval)"
" ...\n",
" ```"
]
},
{
Expand Down

0 comments on commit 38c0c81

Please sign in to comment.