Skip to content

Commit

Permalink
Merge pull request #90 from uc-cdis/feat/aggmds-vlmd
Browse files Browse the repository at this point in the history
HP-1044 Feat/aggmds vlmd
  • Loading branch information
mfshao authored Feb 11, 2023
2 parents fe7241a + 90e605a commit 996cd30
Show file tree
Hide file tree
Showing 13 changed files with 325 additions and 132 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,4 @@ postgres-data/

# VSCode
.vscode/
.dccache
26 changes: 19 additions & 7 deletions docs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ components:
type: http
info:
title: Framework Services Object Management Service
version: 2.0.1
version: 3.0.0
openapi: 3.0.2
paths:
/_status:
Expand Down Expand Up @@ -126,8 +126,14 @@ paths:
get:
description: "Returns status and configuration information about aggregate metadata\
\ service.\n\nReturn configuration information. Currently supports only 1\
\ information type:\n**schema**\n\nExample:\n\n {\n schema: {\n \
\ ...\n ...\n }\n }"
\ information type:\n**schema**\n\nExample:\n\n{\n \"__manifest\":{\n \
\ \"type\":\"array\",\n \"properties\":{\n \"file_name\"\
:{\n \"type\":\"string\",\n \"description\"\
:\"\"\n },\n \"file_size\":{\n \"type\"\
:\"integer\",\n \"description\":\"\"\n }\n \
\ },\n \"description\":\"\",\n \"default\":[\n\n ]\n\
\ },\n \"commons_url\":{\n \"type\":\"string\",\n \"description\"\
:\"\"\n },\n ...\n}"
operationId: get_commons_info_aggregate_info__what__get
parameters:
- in: path
Expand Down Expand Up @@ -238,8 +244,9 @@ paths:
- Aggregate
/aggregate/metadata/guid/{guid}:
get:
description: "Returns a metadata record by GUID\n\nExample:\n\n { id2: {\
\ name: \"bear\" } }"
description: "Returns a metadata record by GUID\n\nExample:\n\n {\n \
\ \"gen3_discovery\": {\n \"name\": \"cat\",\n \"\
type\": \"study\",\n ...\n }\n }"
operationId: get_aggregate_metadata_guid_aggregate_metadata_guid__guid__get
parameters:
- in: path
Expand Down Expand Up @@ -267,8 +274,13 @@ paths:
get:
description: "et all metadata records from a commons by name\n\nReturns an array\
\ containing all the metadata entries for a single commons.\nThere are no\
\ limit/offset parameters.\n\nExample:\n\n [ { id2: { name: \"bear\" }\
\ } , { id3: { name: \"cat\" } }]"
\ limit/offset parameters.\n\nExample:\n\n [\n {\n \"\
gen3_discovery\": {\n \"name\": \"bear\",\n \
\ \"type\": \"study\",\n ...\n },\n \"\
data_dictionaries\": {\n ...\n }\n },\n \
\ {\n \"gen3_discovery\": {\n \"name\": \"\
cat\",\n \"type\": \"study\",\n ...\n \
\ }\n },\n ...\n ]"
operationId: get_aggregate_metadata_for_commons_aggregate_metadata__name__get
parameters:
- in: path
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "mds"
version = "2.0.1"
version = "3.0.0"
description = "Metadata Service"
authors = ["CTDS UChicago <cdis@uchicago.edu>"]
license = "Apache-2.0"
Expand Down
4 changes: 4 additions & 0 deletions src/mds/agg_mds/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,7 @@ def normalizeToGen3MDSFields(self, data, **kwargs) -> Dict[str, Any]:
mappings = kwargs.get("mappings", None)
config = kwargs.get("config", {})
study_field = config.get("study_field", "gen3_discovery")
data_dict_field = config.get("data_dict_field", None)
keepOriginalFields = kwargs.get("keepOriginalFields", True)
globalFieldFilters = kwargs.get("globalFieldFilters", [])
schema = kwargs.get("schema", {})
Expand All @@ -1021,6 +1022,9 @@ def normalizeToGen3MDSFields(self, data, **kwargs) -> Dict[str, Any]:
"_guid_type": "discovery_metadata",
"gen3_discovery": item,
}
# for VLMD, bring it into AggMDS records
if data_dict_field is not None and data_dict_field in record:
results[guid][data_dict_field] = record[data_dict_field]

perItemValues = kwargs.get("perItemValues", None)
if perItemValues is not None:
Expand Down
2 changes: 2 additions & 0 deletions src/mds/agg_mds/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ class MDSInstance:
] = None
study_data_field: str = "gen3_discovery"
guid_type: str = "discovery_metadata"
data_dict_field: Optional[str] = None
select_field: Optional[Dict[str, str]] = None

def __post_init__(self):
Expand All @@ -219,6 +220,7 @@ class AdapterMDSInstance:
field_mappings: Optional[Dict[str, Any]] = None
per_item_values: Optional[Dict[str, Any]] = None
study_data_field: str = "gen3_discovery"
data_dict_field: Optional[str] = None
keep_original_fields: bool = True
global_field_filters: List[str] = field(default_factory=list)
commons_name: Optional[str] = None
Expand Down
87 changes: 65 additions & 22 deletions src/mds/agg_mds/datastore/elasticsearch_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
from typing import Any, List, Dict, Union, Optional, Tuple
from math import ceil
from mds import logger
from mds.config import AGG_MDS_NAMESPACE, ES_RETRY_LIMIT, ES_RETRY_INTERVAL
from mds.config import (
AGG_MDS_NAMESPACE,
ES_RETRY_LIMIT,
ES_RETRY_INTERVAL,
AGG_MDS_DEFAULT_STUDY_DATA_FIELD,
AGG_MDS_DEFAULT_DATA_DICT_FIELD,
)

AGG_MDS_INDEX = f"{AGG_MDS_NAMESPACE}-commons-index"
AGG_MDS_TYPE = "commons"
Expand Down Expand Up @@ -189,7 +195,6 @@ async def update_metadata(
guid_arr: List[str],
tags: Dict[str, List[str]],
info: Dict[str, str],
study_data_field: str,
use_temp_index: bool = False,
):
index_to_update = AGG_MDS_INFO_INDEX_TEMP if use_temp_index else AGG_MDS_INFO_INDEX
Expand All @@ -201,10 +206,16 @@ async def update_metadata(
)

index_to_update = AGG_MDS_INDEX_TEMP if use_temp_index else AGG_MDS_INDEX
for doc in data:
key = list(doc.keys())[0]
for d in data:
key = list(d.keys())[0]
# Flatten out this structure
doc = doc[key][study_data_field]
doc = {
AGG_MDS_DEFAULT_STUDY_DATA_FIELD: d[key][AGG_MDS_DEFAULT_STUDY_DATA_FIELD]
}
if AGG_MDS_DEFAULT_DATA_DICT_FIELD in d[key]:
doc[AGG_MDS_DEFAULT_DATA_DICT_FIELD] = d[key][
AGG_MDS_DEFAULT_DATA_DICT_FIELD
]

try:
elastic_search_client.index(
Expand Down Expand Up @@ -249,12 +260,26 @@ async def get_commons():
index=AGG_MDS_INDEX,
body={
"size": 0,
"aggs": {"commons_names": {"terms": {"field": "commons_name.keyword"}}},
"aggs": {
AGG_MDS_DEFAULT_STUDY_DATA_FIELD: {
"nested": {"path": AGG_MDS_DEFAULT_STUDY_DATA_FIELD},
"aggs": {
"commons_names": {
"terms": {
"field": f"{AGG_MDS_DEFAULT_STUDY_DATA_FIELD}.commons_name.keyword"
}
}
},
}
},
},
)
return {
"commons": [
x["key"] for x in res["aggregations"]["commons_names"]["buckets"]
x["key"]
for x in res["aggregations"][AGG_MDS_DEFAULT_STUDY_DATA_FIELD][
"commons_names"
]["buckets"]
]
}
except Exception as error:
Expand Down Expand Up @@ -282,9 +307,12 @@ def process_record(record: dict, counts: Optional[List[str]]) -> Tuple[str, dict
"""
_id = record["_id"]
normalized = record["_source"]
for c in counts:
if c in normalized:
normalized[c] = count(normalized[c])
if AGG_MDS_DEFAULT_STUDY_DATA_FIELD in normalized:
for c in counts:
if c in normalized[AGG_MDS_DEFAULT_STUDY_DATA_FIELD]:
normalized[AGG_MDS_DEFAULT_STUDY_DATA_FIELD][c] = count(
normalized[AGG_MDS_DEFAULT_STUDY_DATA_FIELD][c]
)
return _id, normalized


Expand All @@ -295,11 +323,11 @@ async def get_all_metadata(limit, offset, counts: Optional[str] = None, flatten=
counts: converts the count of the entry[count] if it is a dict or array
returns:
flattend == true
flattened == true
results : MDS results as a dict
paging info
flattend == false
flattened == false
results : {
commonsA: metadata
commonsB: metadata
Expand Down Expand Up @@ -345,7 +373,7 @@ async def get_all_metadata(limit, offset, counts: Optional[str] = None, flatten=
flat = []
for record in res["hits"]["hits"]:
rid, normalized = process_record(record, toReduce)
flat.append({rid: {"gen3_discovery": normalized}})
flat.append({rid: normalized})
return {
"results": flat,
"pagination": {
Expand All @@ -367,12 +395,12 @@ async def get_all_metadata(limit, offset, counts: Optional[str] = None, flatten=
}
for record in res["hits"]["hits"]:
rid, normalized = process_record(record, toReduce)
commons_name = normalized["commons_name"]
commons_name = normalized[AGG_MDS_DEFAULT_STUDY_DATA_FIELD][
"commons_name"
]
if commons_name not in byCommons["results"]:
byCommons["results"][commons_name] = []
byCommons["results"][commons_name].append(
{rid: {"gen3_discovery": normalized}}
)
byCommons["results"][commons_name].append({rid: normalized})

return byCommons
except Exception as error:
Expand All @@ -384,7 +412,18 @@ async def get_all_named_commons_metadata(name):
try:
res = elastic_search_client.search(
index=AGG_MDS_INDEX,
body={"query": {"match": {"commons_name.keyword": name}}},
body={
"query": {
"nested": {
"path": AGG_MDS_DEFAULT_STUDY_DATA_FIELD,
"query": {
"match": {
f"{AGG_MDS_DEFAULT_STUDY_DATA_FIELD}.commons_name.keyword": "HEAL"
}
},
}
}
},
)
return [x["_source"] for x in res["hits"]["hits"]]
except Exception as error:
Expand All @@ -400,14 +439,16 @@ async def metadata_tags():
"size": 0,
"aggs": {
"tags": {
"nested": {"path": "tags"},
"nested": {"path": f"{AGG_MDS_DEFAULT_STUDY_DATA_FIELD}.tags"},
"aggs": {
"categories": {
"terms": {"field": "tags.category.keyword"},
"terms": {
"field": f"{AGG_MDS_DEFAULT_STUDY_DATA_FIELD}.tags.category.keyword"
},
"aggs": {
"name": {
"terms": {
"field": "tags.name.keyword",
"field": f"{AGG_MDS_DEFAULT_STUDY_DATA_FIELD}.tags.name.keyword"
}
}
},
Expand Down Expand Up @@ -459,7 +500,9 @@ async def get_aggregations(name):
"query": {
"constant_score": {
"filter": {
"match": {"commons_name": name},
"match": {
f"{AGG_MDS_DEFAULT_STUDY_DATA_FIELD}.commons_name": name
},
}
}
},
Expand Down
57 changes: 50 additions & 7 deletions src/mds/agg_mds/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,30 @@ async def get_commons_info(what: str):
Example:
{
schema: {
...
...
{
"__manifest":{
"type":"array",
"properties":{
"file_name":{
"type":"string",
"description":""
},
"file_size":{
"type":"integer",
"description":""
}
}
},
"description":"",
"default":[
]
},
"commons_url":{
"type":"string",
"description":""
},
...
}
"""
res = await datastore.get_commons_attribute(what)
Expand Down Expand Up @@ -136,7 +154,26 @@ async def get_aggregate_metadata_for_commons(
Example:
[ { id2: { name: "bear" } } , { id3: { name: "cat" } }]
[
{
"gen3_discovery": {
"name": "bear",
"type": "study",
...
},
"data_dictionaries": {
...
}
},
{
"gen3_discovery": {
"name": "cat",
"type": "study",
...
}
},
...
]
"""
res = await datastore.get_all_named_commons_metadata(name)
Expand Down Expand Up @@ -211,7 +248,13 @@ async def get_aggregate_metadata_guid(guid: str):
Example:
{ id2: { name: "bear" } }
{
"gen3_discovery": {
"name": "cat",
"type": "study",
...
}
}
"""
res = await datastore.get_by_guid(guid)
if res:
Expand Down
6 changes: 6 additions & 0 deletions src/mds/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ def __init__(self, value):
URL_PREFIX = config("URL_PREFIX", default="/" if DEBUG else "/mds")
USE_AGG_MDS = config("USE_AGG_MDS", cast=bool, default=False)
AGG_MDS_NAMESPACE = config("AGG_MDS_NAMESPACE", default="default_namespace")
AGG_MDS_DEFAULT_STUDY_DATA_FIELD = config(
"AGG_MDS_DEFAULT_STUDY_DATA_FIELD", cast=str, default="gen3_discovery"
)
AGG_MDS_DEFAULT_DATA_DICT_FIELD = config(
"AGG_MDS_DEFAULT_DATA_DICT_FIELD", cast=str, default="data_dictionaries"
)
ES_ENDPOINT = config("GEN3_ES_ENDPOINT", default="http://localhost:9200")

# Database
Expand Down
Loading

0 comments on commit 996cd30

Please sign in to comment.