Skip to content

Commit

Permalink
feat: parse data point fqn to fetch signed url (#386)
Browse files Browse the repository at this point in the history
* feat: parse data point fqn to fetch signed url

---------

Co-authored-by: Sai krishna <sai@truefoundry.com>
  • Loading branch information
mnvsk97 and Sai krishna authored Oct 22, 2024
1 parent b97169b commit 04176cf
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 71 deletions.
4 changes: 2 additions & 2 deletions backend/modules/query_controllers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from backend.modules.query_controllers.types import *
from backend.modules.vector_db.client import VECTOR_STORE_CLIENT
from backend.settings import settings
from backend.utils import _get_read_signed_url_with_cache
from backend.utils import _get_read_signed_url


class BaseQueryController:
Expand Down Expand Up @@ -232,7 +232,7 @@ def _enrich_metadata_with_signed_url(
data_dir_fqn, file_path = match.groups()

# Generate a signed url for the file
signed_url = _get_read_signed_url_with_cache(
signed_url = _get_read_signed_url(
fqn=data_dir_fqn,
file_path=file_path,
cache=artifact_repo_cache,
Expand Down
145 changes: 86 additions & 59 deletions backend/server/routers/internal.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
import os
import re
import uuid
from types import SimpleNamespace
from typing import List, Optional

from fastapi import APIRouter, File, Form, HTTPException, Query, UploadFile
from fastapi.responses import JSONResponse
from truefoundry.ml import DataDirectory
from truefoundry.ml.autogen.client.models.signed_url_dto import SignedURLDto

from backend.logger import logger
from backend.modules.model_gateway.model_gateway import model_gateway
from backend.server.routers.data_source import add_data_source
from backend.settings import settings
from backend.types import CreateDataSource, ModelType, UploadToDataDirectoryDto
from backend.utils import TRUEFOUNDRY_CLIENT
from backend.utils import TRUEFOUNDRY_CLIENT, _get_read_signed_url

router = APIRouter(prefix="/v1/internal", tags=["internal"])

Expand All @@ -30,73 +32,57 @@ async def upload_to_docker_directory(
content={"error": "API only supported for local docker environment"},
status_code=500,
)
try:
logger.info(f"Uploading files to directory: {upload_name}")
# create a folder within `/volumes/user_data/` that maps to `/app/user_data/` in the docker volume
# this folder will be used to store the uploaded files
folder_path = os.path.join(settings.LOCAL_DATA_DIRECTORY, upload_name)

# Create the folder if it does not exist, else raise an exception
if not os.path.exists(folder_path):
os.makedirs(folder_path)
else:
return JSONResponse(
content={"error": f"Folder already exists: {upload_name}"},
status_code=500,
)

# Upload the files to the folder
for file in files:
logger.info(f"Copying file: {file.filename}, to folder: {folder_path}")
file_path = os.path.join(folder_path, file.filename)
with open(file_path, "wb") as f:
f.write(file.file.read())

data_source = CreateDataSource(
type="localdir",
uri=folder_path,
)
logger.info(f"Uploading files to directory: {upload_name}")
# create a folder within `/volumes/user_data/` that maps to `/app/user_data/` in the docker volume
# this folder will be used to store the uploaded files
folder_path = os.path.join(settings.LOCAL_DATA_DIRECTORY, upload_name)

# Add the data source to the metadata store.
return await add_data_source(data_source)
except Exception as ex:
logger.exception(f"Error uploading files to directory: {ex}")
# Create the folder if it does not exist, else raise an exception
if os.path.exists(folder_path):
return JSONResponse(
content={"error": f"Error uploading files to directory: {ex}"},
content={"error": f"Folder already exists: {upload_name}"},
status_code=500,
)

# Create the folder by the given name
os.makedirs(folder_path)

@router.post("/upload-to-data-directory")
async def upload_to_data_directory(req: UploadToDataDirectoryDto):
"""This function uploads files to the data directory given by the name req.upload_name"""
try:
if settings.ML_REPO_NAME == "":
return JSONResponse(
content={"error": "ML_REPO_NAME is not set in the environment"},
status_code=500,
)
# Create a new data directory.
dataset = TRUEFOUNDRY_CLIENT.create_data_directory(
settings.ML_REPO_NAME,
req.upload_name,
)

_artifacts_repo = DataDirectory.from_fqn(fqn=dataset.fqn)._get_artifacts_repo()
# Upload the files to the folder
for file in files:
logger.info(f"Copying file: {file.filename}, to folder: {folder_path}")
file_path = os.path.join(folder_path, file.filename)
with open(file_path, "wb") as f:
f.write(file.file.read())

urls = _artifacts_repo.get_signed_urls_for_write(
artifact_identifier=SimpleNamespace(
artifact_version_id=None, dataset_fqn=dataset.fqn
),
paths=req.filepaths,
# Add the data source to the metadata store.
return await add_data_source(
CreateDataSource(
type="localdir",
uri=folder_path,
)
)

data = [url.dict() for url in urls]
return JSONResponse(
content={"data": data, "data_directory_fqn": dataset.fqn},
)
except Exception as ex:
raise Exception(f"Error uploading files to data directory: {ex}") from ex

@router.post("/upload-to-data-directory")
async def upload_to_data_directory(req: UploadToDataDirectoryDto):
# Create a new data directory.
dataset = TRUEFOUNDRY_CLIENT.create_data_directory(
settings.ML_REPO_NAME,
req.upload_name,
)
# Get the signed urls for the write operation.
_artifacts_repo = DataDirectory.from_fqn(fqn=dataset.fqn)._get_artifacts_repo()
urls: List[SignedURLDto] = _artifacts_repo.get_signed_urls_for_write(
artifact_identifier=SimpleNamespace(
artifact_version_id=None, dataset_fqn=dataset.fqn
),
paths=req.filepaths,
)
# Serialize the signed urls.
data = [url.model_dump() for url in urls]
return JSONResponse(
content={"data": data, "data_directory_fqn": dataset.fqn},
)


@router.get("/models")
Expand All @@ -120,3 +106,44 @@ def get_enabled_models(
return JSONResponse(
content={"models": serialized_models},
)


@router.get("/get_signed_url")
def get_signed_url(
data_point_fqn: str = Query(...),
):
"""
Enrich the metadata with the signed url
"""
# Use a single regex to extract both data-dir FQN and file path
match = re.search(r"(data-dir:[^:]+).*?(files/.+)$", data_point_fqn)

# Return if the regex does not match
if not match:
raise HTTPException(
status_code=400,
detail=f"Invalid data point fqn: {data_point_fqn}",
)

# Extract the data-dir FQN and the file path from the FQN with source
data_dir_fqn, file_path = match.groups()

# Generate a signed url for the file
signed_url_info: List[SignedURLDto] = _get_read_signed_url(
fqn=data_dir_fqn,
file_path=file_path,
cache={},
)

# Add the signed url to the metadata if it's not None
if not signed_url_info:
raise HTTPException(
status_code=500,
detail=f"Failed to generate signed url for {data_point_fqn}",
)

return JSONResponse(
content={
"signed_url": signed_url_info[0].signed_url,
}
)
6 changes: 6 additions & 0 deletions backend/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,11 @@ def _validate_values(cls, values: Dict[str, Any]) -> Dict[str, Any]:

return values

@model_validator(mode="before")
def _validate_ml_repo_name(self):
# If the service is not running locally, then ML_REPO_NAME is required.
if not self.LOCAL and not self.ML_REPO_NAME:
raise ValueError("ML_REPO_NAME is not set in the environment")


settings = Settings()
26 changes: 16 additions & 10 deletions backend/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
from contextvars import copy_context
from functools import partial
from types import SimpleNamespace
from typing import Callable, Optional, TypeVar, cast
from typing import Any, Callable, List, Optional, TypeVar, cast

from truefoundry.ml import DataDirectory
from truefoundry.ml import get_client as get_tfy_client
from truefoundry.ml.autogen.client.models.signed_url_dto import SignedURLDto
from typing_extensions import ParamSpec

from backend.logger import logger
Expand Down Expand Up @@ -53,19 +54,27 @@ def unzip_file(file_path, dest_dir):
zip_ref.extractall(dest_dir)


def _get_read_signed_url_with_cache(fqn: str, file_path: str, cache: dict):
if fqn not in cache:
cache[fqn] = DataDirectory.from_fqn(fqn=fqn)._get_artifacts_repo()
url = cache[fqn].get_signed_urls_for_read(
def _get_artifacts_repo(fqn: str, cache: Optional[dict] = None) -> Any:
if cache is not None and fqn in cache:
return cache[fqn]
artifacts_repo = DataDirectory.from_fqn(fqn=fqn)._get_artifacts_repo()
if cache is not None:
cache[fqn] = artifacts_repo
return artifacts_repo


def _get_read_signed_url(
fqn: str, file_path: str, cache: Optional[dict] = None
) -> List[SignedURLDto]:
artifacts_repo = _get_artifacts_repo(fqn, cache)
return artifacts_repo.get_signed_urls_for_read(
artifact_identifier=SimpleNamespace(
artifact_version_id=None,
dataset_fqn=fqn,
),
paths=[file_path],
)

return url


# Taken from https://github.com/langchain-ai/langchain/blob/987099cfcda6f20140228926e9d39eed5ccd35b4/libs/core/langchain_core/runnables/config.py#L528
async def run_in_executor(
Expand Down Expand Up @@ -130,6 +139,3 @@ def _async_to_sync(fn, *args, **kwargs):
logger.exception("Error in AsyncProcessPoolExecutor worker")
future.set_exception(e)
return future

def submit(self, fn, *args, **kwargs):
return super().submit(self._async_to_sync, fn, *args, **kwargs)

0 comments on commit 04176cf

Please sign in to comment.