Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HP-1044 Feat/aggmds vlmd #90

Merged
merged 17 commits into from
Feb 11, 2023
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,4 @@ postgres-data/

# VSCode
.vscode/
.dccache
26 changes: 19 additions & 7 deletions docs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ components:
type: http
info:
title: Framework Services Object Management Service
version: 2.0.1
version: 3.0.0
openapi: 3.0.2
paths:
/_status:
Expand Down Expand Up @@ -126,8 +126,14 @@ paths:
get:
description: "Returns status and configuration information about aggregate metadata\
\ service.\n\nReturn configuration information. Currently supports only 1\
\ information type:\n**schema**\n\nExample:\n\n {\n schema: {\n \
\ ...\n ...\n }\n }"
\ information type:\n**schema**\n\nExample:\n\n{\n \"__manifest\":{\n \
\ \"type\":\"array\",\n \"properties\":{\n \"file_name\"\
:{\n \"type\":\"string\",\n \"description\"\
:\"\"\n },\n \"file_size\":{\n \"type\"\
:\"integer\",\n \"description\":\"\"\n }\n \
\ },\n \"description\":\"\",\n \"default\":[\n\n ]\n\
\ },\n \"commons_url\":{\n \"type\":\"string\",\n \"description\"\
:\"\"\n },\n ...\n}"
operationId: get_commons_info_aggregate_info__what__get
parameters:
- in: path
Expand Down Expand Up @@ -238,8 +244,9 @@ paths:
- Aggregate
/aggregate/metadata/guid/{guid}:
get:
description: "Returns a metadata record by GUID\n\nExample:\n\n { id2: {\
\ name: \"bear\" } }"
description: "Returns a metadata record by GUID\n\nExample:\n\n {\n \
\ \"gen3_discovery\": {\n \"name\": \"cat\",\n \"\
type\": \"study\",\n ...\n }\n }"
operationId: get_aggregate_metadata_guid_aggregate_metadata_guid__guid__get
parameters:
- in: path
Expand Down Expand Up @@ -267,8 +274,13 @@ paths:
get:
description: "et all metadata records from a commons by name\n\nReturns an array\
\ containing all the metadata entries for a single commons.\nThere are no\
\ limit/offset parameters.\n\nExample:\n\n [ { id2: { name: \"bear\" }\
\ } , { id3: { name: \"cat\" } }]"
\ limit/offset parameters.\n\nExample:\n\n [\n {\n \"\
gen3_discovery\": {\n \"name\": \"bear\",\n \
\ \"type\": \"study\",\n ...\n },\n \"\
data_dictionaries\": {\n ...\n }\n },\n \
\ {\n \"gen3_discovery\": {\n \"name\": \"\
cat\",\n \"type\": \"study\",\n ...\n \
\ }\n },\n ...\n ]"
operationId: get_aggregate_metadata_for_commons_aggregate_metadata__name__get
parameters:
- in: path
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "mds"
version = "2.0.1"
version = "3.0.0"
description = "Metadata Service"
authors = ["CTDS UChicago <cdis@uchicago.edu>"]
license = "Apache-2.0"
Expand Down
4 changes: 4 additions & 0 deletions src/mds/agg_mds/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,7 @@ def normalizeToGen3MDSFields(self, data, **kwargs) -> Dict[str, Any]:
mappings = kwargs.get("mappings", None)
config = kwargs.get("config", {})
study_field = config.get("study_field", "gen3_discovery")
data_dict_field = config.get("data_dict_field", None)
keepOriginalFields = kwargs.get("keepOriginalFields", True)
globalFieldFilters = kwargs.get("globalFieldFilters", [])
schema = kwargs.get("schema", {})
Expand All @@ -1021,6 +1022,9 @@ def normalizeToGen3MDSFields(self, data, **kwargs) -> Dict[str, Any]:
"_guid_type": "discovery_metadata",
"gen3_discovery": item,
}
# for VLMD, bring it into AggMDS records
if data_dict_field is not None and data_dict_field in record:
results[guid][data_dict_field] = record[data_dict_field]

perItemValues = kwargs.get("perItemValues", None)
if perItemValues is not None:
Expand Down
2 changes: 2 additions & 0 deletions src/mds/agg_mds/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ class MDSInstance:
] = None
study_data_field: str = "gen3_discovery"
guid_type: str = "discovery_metadata"
data_dict_field: Optional[str] = None
select_field: Optional[Dict[str, str]] = None

def __post_init__(self):
Expand All @@ -219,6 +220,7 @@ class AdapterMDSInstance:
field_mappings: Optional[Dict[str, Any]] = None
per_item_values: Optional[Dict[str, Any]] = None
study_data_field: str = "gen3_discovery"
data_dict_field: Optional[str] = None
keep_original_fields: bool = True
global_field_filters: List[str] = field(default_factory=list)
commons_name: Optional[str] = None
Expand Down
87 changes: 65 additions & 22 deletions src/mds/agg_mds/datastore/elasticsearch_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
from typing import Any, List, Dict, Union, Optional, Tuple
from math import ceil
from mds import logger
from mds.config import AGG_MDS_NAMESPACE, ES_RETRY_LIMIT, ES_RETRY_INTERVAL
from mds.config import (
AGG_MDS_NAMESPACE,
ES_RETRY_LIMIT,
ES_RETRY_INTERVAL,
AGG_MDS_DEFAULT_STUDY_DATA_FIELD,
AGG_MDS_DEFAULT_DATA_DICT_FIELD,
)

AGG_MDS_INDEX = f"{AGG_MDS_NAMESPACE}-commons-index"
AGG_MDS_TYPE = "commons"
Expand Down Expand Up @@ -189,7 +195,6 @@ async def update_metadata(
guid_arr: List[str],
tags: Dict[str, List[str]],
info: Dict[str, str],
study_data_field: str,
use_temp_index: bool = False,
):
index_to_update = AGG_MDS_INFO_INDEX_TEMP if use_temp_index else AGG_MDS_INFO_INDEX
Expand All @@ -201,10 +206,16 @@ async def update_metadata(
)

index_to_update = AGG_MDS_INDEX_TEMP if use_temp_index else AGG_MDS_INDEX
for doc in data:
key = list(doc.keys())[0]
for d in data:
key = list(d.keys())[0]
# Flatten out this structure
doc = doc[key][study_data_field]
doc = {
AGG_MDS_DEFAULT_STUDY_DATA_FIELD: d[key][AGG_MDS_DEFAULT_STUDY_DATA_FIELD]
}
if AGG_MDS_DEFAULT_DATA_DICT_FIELD in d[key]:
doc[AGG_MDS_DEFAULT_DATA_DICT_FIELD] = d[key][
AGG_MDS_DEFAULT_DATA_DICT_FIELD
]

try:
elastic_search_client.index(
Expand Down Expand Up @@ -249,12 +260,26 @@ async def get_commons():
index=AGG_MDS_INDEX,
body={
"size": 0,
"aggs": {"commons_names": {"terms": {"field": "commons_name.keyword"}}},
"aggs": {
AGG_MDS_DEFAULT_STUDY_DATA_FIELD: {
"nested": {"path": AGG_MDS_DEFAULT_STUDY_DATA_FIELD},
"aggs": {
"commons_names": {
"terms": {
"field": f"{AGG_MDS_DEFAULT_STUDY_DATA_FIELD}.commons_name.keyword"
}
}
},
}
},
},
)
return {
"commons": [
x["key"] for x in res["aggregations"]["commons_names"]["buckets"]
x["key"]
for x in res["aggregations"][AGG_MDS_DEFAULT_STUDY_DATA_FIELD][
"commons_names"
]["buckets"]
]
}
except Exception as error:
Expand Down Expand Up @@ -282,9 +307,12 @@ def process_record(record: dict, counts: Optional[List[str]]) -> Tuple[str, dict
"""
_id = record["_id"]
normalized = record["_source"]
for c in counts:
if c in normalized:
normalized[c] = count(normalized[c])
if AGG_MDS_DEFAULT_STUDY_DATA_FIELD in normalized:
for c in counts:
if c in normalized[AGG_MDS_DEFAULT_STUDY_DATA_FIELD]:
normalized[AGG_MDS_DEFAULT_STUDY_DATA_FIELD][c] = count(
normalized[AGG_MDS_DEFAULT_STUDY_DATA_FIELD][c]
)
return _id, normalized


Expand All @@ -295,11 +323,11 @@ async def get_all_metadata(limit, offset, counts: Optional[str] = None, flatten=
counts: converts the count of the entry[count] if it is a dict or array
returns:

flattend == true
flattened == true
results : MDS results as a dict
paging info

flattend == false
flattened == false
results : {
commonsA: metadata
commonsB: metadata
Expand Down Expand Up @@ -345,7 +373,7 @@ async def get_all_metadata(limit, offset, counts: Optional[str] = None, flatten=
flat = []
for record in res["hits"]["hits"]:
rid, normalized = process_record(record, toReduce)
flat.append({rid: {"gen3_discovery": normalized}})
flat.append({rid: normalized})
return {
"results": flat,
"pagination": {
Expand All @@ -367,12 +395,12 @@ async def get_all_metadata(limit, offset, counts: Optional[str] = None, flatten=
}
for record in res["hits"]["hits"]:
rid, normalized = process_record(record, toReduce)
commons_name = normalized["commons_name"]
commons_name = normalized[AGG_MDS_DEFAULT_STUDY_DATA_FIELD][
"commons_name"
]
if commons_name not in byCommons["results"]:
byCommons["results"][commons_name] = []
byCommons["results"][commons_name].append(
{rid: {"gen3_discovery": normalized}}
)
byCommons["results"][commons_name].append({rid: normalized})

return byCommons
except Exception as error:
Expand All @@ -384,7 +412,18 @@ async def get_all_named_commons_metadata(name):
try:
res = elastic_search_client.search(
index=AGG_MDS_INDEX,
body={"query": {"match": {"commons_name.keyword": name}}},
body={
"query": {
"nested": {
"path": AGG_MDS_DEFAULT_STUDY_DATA_FIELD,
"query": {
"match": {
f"{AGG_MDS_DEFAULT_STUDY_DATA_FIELD}.commons_name.keyword": "HEAL"
}
},
}
}
},
)
return [x["_source"] for x in res["hits"]["hits"]]
except Exception as error:
Expand All @@ -400,14 +439,16 @@ async def metadata_tags():
"size": 0,
"aggs": {
"tags": {
"nested": {"path": "tags"},
"nested": {"path": f"{AGG_MDS_DEFAULT_STUDY_DATA_FIELD}.tags"},
"aggs": {
"categories": {
"terms": {"field": "tags.category.keyword"},
"terms": {
"field": f"{AGG_MDS_DEFAULT_STUDY_DATA_FIELD}.tags.category.keyword"
},
"aggs": {
"name": {
"terms": {
"field": "tags.name.keyword",
"field": f"{AGG_MDS_DEFAULT_STUDY_DATA_FIELD}.tags.name.keyword"
}
}
},
Expand Down Expand Up @@ -459,7 +500,9 @@ async def get_aggregations(name):
"query": {
"constant_score": {
"filter": {
"match": {"commons_name": name},
"match": {
f"{AGG_MDS_DEFAULT_STUDY_DATA_FIELD}.commons_name": name
},
}
}
},
Expand Down
57 changes: 50 additions & 7 deletions src/mds/agg_mds/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,30 @@ async def get_commons_info(what: str):

Example:

{
schema: {
...
...
{
"__manifest":{
"type":"array",
"properties":{
"file_name":{
"type":"string",
"description":""
},
"file_size":{
"type":"integer",
"description":""
}
}
},
"description":"",
"default":[

]
},
"commons_url":{
"type":"string",
"description":""
},
...
}

"""
res = await datastore.get_commons_attribute(what)
Expand Down Expand Up @@ -136,7 +154,26 @@ async def get_aggregate_metadata_for_commons(

Example:

[ { id2: { name: "bear" } } , { id3: { name: "cat" } }]
[
{
"gen3_discovery": {
"name": "bear",
"type": "study",
...
},
"data_dictionaries": {
...
}
},
{
"gen3_discovery": {
"name": "cat",
"type": "study",
...
}
},
...
]

"""
res = await datastore.get_all_named_commons_metadata(name)
Expand Down Expand Up @@ -211,7 +248,13 @@ async def get_aggregate_metadata_guid(guid: str):

Example:

{ id2: { name: "bear" } }
{
"gen3_discovery": {
"name": "cat",
"type": "study",
...
}
}
"""
res = await datastore.get_by_guid(guid)
if res:
Expand Down
6 changes: 6 additions & 0 deletions src/mds/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ def __init__(self, value):
URL_PREFIX = config("URL_PREFIX", default="/" if DEBUG else "/mds")
USE_AGG_MDS = config("USE_AGG_MDS", cast=bool, default=False)
AGG_MDS_NAMESPACE = config("AGG_MDS_NAMESPACE", default="default_namespace")
AGG_MDS_DEFAULT_STUDY_DATA_FIELD = config(
"AGG_MDS_DEFAULT_STUDY_DATA_FIELD", cast=str, default="gen3_discovery"
)
AGG_MDS_DEFAULT_DATA_DICT_FIELD = config(
"AGG_MDS_DEFAULT_DATA_DICT_FIELD", cast=str, default="data_dictionaries"
)
ES_ENDPOINT = config("GEN3_ES_ENDPOINT", default="http://localhost:9200")

# Database
Expand Down
Loading