Skip to content

Commit

Permalink
Relational Metdata store with Prisma (#203)
Browse files Browse the repository at this point in the history
* Added prisma schema

* Fixed data sources API, modified METADATA_STORE client, TODOs: fix clients in all the code, fix all collections APIs

* Completed datasources and collections api

* Added data ingestion apis

* Added delete ingestion runs in delete collection

* Modified query controllers

* Wrapped TF metadata store in agent executor loop to support for sync function

* Added docker compose for prisma and postgress

* Tfloader is optional available only when TFY_API_KEY is present, modified regex for collection and datasource in FE

* import of PrismaStore only for Local environment

* Added indexer support via UI

* Symlinks and API, for localdir upload, WIP

* Added feature to upload data to docker vol

* Added delete datasource API for prisma

* Added empty function for delete_data_source in Truefoundry

* feat: upload to local directory integration in FE (#228)

* feat: upload to local directory integration in FE

* feat: add data source unlink button

* feat: integrate delete data source api

* fix: add VITE_USE_LOCAL in dockerfile

* Fixed docker-compose for FE

* Removed console logs

* Updated compose.env
  • Loading branch information
S1LV3RJ1NX authored Jun 20, 2024
1 parent 7174901 commit 427b5e9
Show file tree
Hide file tree
Showing 33 changed files with 1,223 additions and 167 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ qdrant_storage/
.truefoundry
infinity/
volumes/
pgdata/
*.bak
1 change: 1 addition & 0 deletions .tfyignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ qdrant_db/
*.pth
.truefoundry
volumes/
pgdata/
66 changes: 66 additions & 0 deletions backend/database/schema.prisma
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}

generator client {
provider = "prisma-client-py"
recursive_type_depth = 5
}

model DataSource {
id Int @id @default(autoincrement())
type String
uri String @unique
metadata Json?
fqn String @unique
@@map("data_sources")
}

model Collection {
id Int @id @default(autoincrement())
name String @unique
description String?
embedder_config Json
// Collection can have multiple data sources
associated_data_sources Json?
@@map("collections")
}

model IngestionRuns {
id Int @id @default(autoincrement())
name String @unique
collection_name String
data_source_fqn String
parser_config Json
data_ingestion_mode String
status String
raise_error_on_failure Boolean
errors Json?
@@map("ingestion_runs")
}

enum Role {
USER
ADMIN
}

// From project root:
// Validate: prisma validate --schema ./backend/database/schema.prisma

// Generate Client: prisma generate --schema ./backend/database/schema.prisma

// Push: prisma db push --schema ./backend/database/schema.prisma
// The db push command also generates the client for you. If you want to generate the client without modifying your database, use the generate command.

// It should be noted that whenever you make changes to your schema.prisma file you will have to re-generate the client,
// you can do this automatically by running `prisma generate --schema ./backend/database/schema.prisma --watch`

// Whenever you make changes to your model, migrate your database and re-generate your prisma code:
// # apply migrations
// prisma migrate dev --name "add comment model"
// # generate
// prisma generate
198 changes: 153 additions & 45 deletions backend/indexer/indexer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os
import tempfile
from typing import Dict, List
Expand All @@ -11,11 +12,13 @@
from backend.logger import logger
from backend.modules.dataloaders.loader import get_loader_for_data_source
from backend.modules.embedder.embedder import get_embedder
from backend.modules.metadata_store.client import METADATA_STORE_CLIENT
from backend.modules.metadata_store.client import get_client
from backend.modules.metadata_store.truefoundry import TrueFoundry
from backend.modules.parsers.parser import get_parser_for_extension
from backend.modules.vector_db.client import VECTOR_STORE_CLIENT
from backend.settings import settings
from backend.types import (
Collection,
CreateDataIngestionRun,
DataIngestionMode,
DataIngestionRunStatus,
Expand Down Expand Up @@ -62,10 +65,21 @@ async def sync_data_source_to_collection(inputs: DataIngestionConfig):
Returns:
None
"""
METADATA_STORE_CLIENT.update_data_ingestion_run_status(
data_ingestion_run_name=inputs.data_ingestion_run_name,
status=DataIngestionRunStatus.FETCHING_EXISTING_VECTORS,
)
client = await get_client()

if isinstance(client, TrueFoundry):
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
client.update_data_ingestion_run_status,
inputs.data_ingestion_run_name,
DataIngestionRunStatus.FETCHING_EXISTING_VECTORS,
)
else:
await client.update_data_ingestion_run_status(
data_ingestion_run_name=inputs.data_ingestion_run_name,
status=DataIngestionRunStatus.FETCHING_EXISTING_VECTORS,
)
try:
existing_data_point_vectors = VECTOR_STORE_CLIENT.list_data_point_vectors(
collection_name=inputs.collection_name,
Expand All @@ -80,53 +94,116 @@ async def sync_data_source_to_collection(inputs: DataIngestionConfig):
)
except Exception as e:
logger.exception(e)
METADATA_STORE_CLIENT.update_data_ingestion_run_status(
if isinstance(client, TrueFoundry):
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
client.update_data_ingestion_run_status,
inputs.data_ingestion_run_name,
DataIngestionRunStatus.FETCHING_EXISTING_VECTORS_FAILED,
)
else:
await client.update_data_ingestion_run_status(
data_ingestion_run_name=inputs.data_ingestion_run_name,
status=DataIngestionRunStatus.FETCHING_EXISTING_VECTORS_FAILED,
)
raise e
if isinstance(client, TrueFoundry):
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
client.update_data_ingestion_run_status,
inputs.data_ingestion_run_name,
DataIngestionRunStatus.DATA_INGESTION_STARTED,
)
else:
await client.update_data_ingestion_run_status(
data_ingestion_run_name=inputs.data_ingestion_run_name,
status=DataIngestionRunStatus.FETCHING_EXISTING_VECTORS_FAILED,
status=DataIngestionRunStatus.DATA_INGESTION_STARTED,
)
raise e
METADATA_STORE_CLIENT.update_data_ingestion_run_status(
data_ingestion_run_name=inputs.data_ingestion_run_name,
status=DataIngestionRunStatus.DATA_INGESTION_STARTED,
)
try:
await _sync_data_source_to_collection(
inputs=inputs,
previous_snapshot=previous_snapshot,
)
except Exception as e:
logger.exception(e)
METADATA_STORE_CLIENT.update_data_ingestion_run_status(
if isinstance(client, TrueFoundry):
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
client.update_data_ingestion_run_status,
inputs.data_ingestion_run_name,
DataIngestionRunStatus.DATA_INGESTION_FAILED,
)
else:
await client.update_data_ingestion_run_status(
data_ingestion_run_name=inputs.data_ingestion_run_name,
status=DataIngestionRunStatus.DATA_INGESTION_FAILED,
)
raise e
if isinstance(client, TrueFoundry):
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
client.update_data_ingestion_run_status,
inputs.data_ingestion_run_name,
DataIngestionRunStatus.DATA_INGESTION_COMPLETED,
)
else:
await client.update_data_ingestion_run_status(
data_ingestion_run_name=inputs.data_ingestion_run_name,
status=DataIngestionRunStatus.DATA_INGESTION_FAILED,
status=DataIngestionRunStatus.DATA_INGESTION_COMPLETED,
)
raise e
METADATA_STORE_CLIENT.update_data_ingestion_run_status(
data_ingestion_run_name=inputs.data_ingestion_run_name,
status=DataIngestionRunStatus.DATA_INGESTION_COMPLETED,
)
# Delete the outdated data point vectors from the vector store
if inputs.data_ingestion_mode == DataIngestionMode.FULL:
METADATA_STORE_CLIENT.update_data_ingestion_run_status(
data_ingestion_run_name=inputs.data_ingestion_run_name,
status=DataIngestionRunStatus.DATA_CLEANUP_STARTED,
)
if isinstance(client, TrueFoundry):
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
client.update_data_ingestion_run_status,
inputs.data_ingestion_run_name,
DataIngestionRunStatus.DATA_CLEANUP_STARTED,
)
else:
await client.update_data_ingestion_run_status(
data_ingestion_run_name=inputs.data_ingestion_run_name,
status=DataIngestionRunStatus.DATA_CLEANUP_STARTED,
)
try:
VECTOR_STORE_CLIENT.delete_data_point_vectors(
collection_name=inputs.collection_name,
data_point_vectors=existing_data_point_vectors,
)
except Exception as e:
logger.exception(e)
METADATA_STORE_CLIENT.update_data_ingestion_run_status(
data_ingestion_run_name=inputs.data_ingestion_run_name,
status=DataIngestionRunStatus.DATA_CLEANUP_FAILED,
)
if isinstance(client, TrueFoundry):
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
client.update_data_ingestion_run_status,
inputs.data_ingestion_run_name,
DataIngestionRunStatus.DATA_CLEANUP_FAILED,
)
else:
await client.update_data_ingestion_run_status(
data_ingestion_run_name=inputs.data_ingestion_run_name,
status=DataIngestionRunStatus.DATA_CLEANUP_FAILED,
)
raise e
METADATA_STORE_CLIENT.update_data_ingestion_run_status(
data_ingestion_run_name=inputs.data_ingestion_run_name,
status=DataIngestionRunStatus.COMPLETED,
)
if isinstance(client, TrueFoundry):
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
client.update_data_ingestion_run_status,
inputs.data_ingestion_run_name,
DataIngestionRunStatus.COMPLETED,
)
else:
await client.update_data_ingestion_run_status(
data_ingestion_run_name=inputs.data_ingestion_run_name,
status=DataIngestionRunStatus.COMPLETED,
)


async def _sync_data_source_to_collection(
Expand All @@ -146,6 +223,8 @@ async def _sync_data_source_to_collection(
None
"""

client = await get_client()

failed_data_point_fqns = []
documents_ingested_count = 0
# Create a temp dir to store the data
Expand Down Expand Up @@ -184,10 +263,19 @@ async def _sync_data_source_to_collection(
f"Failed to ingest {len(failed_data_point_fqns)} data points. data point fqns:"
)
logger.error(failed_data_point_fqns)
METADATA_STORE_CLIENT.log_errors_for_data_ingestion_run(
data_ingestion_run_name=inputs.data_ingestion_run_name,
errors={"failed_data_point_fqns": failed_data_point_fqns},
)
if isinstance(client, TrueFoundry):
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
client.log_errors_for_data_ingestion_run,
inputs.data_ingestion_run_name,
{"failed_data_point_fqns": failed_data_point_fqns},
)
else:
await client.log_errors_for_data_ingestion_run(
data_ingestion_run_name=inputs.data_ingestion_run_name,
errors={"failed_data_point_fqns": failed_data_point_fqns},
)
raise Exception(
f"Failed to ingest {len(failed_data_point_fqns)} data points"
)
Expand Down Expand Up @@ -301,9 +389,19 @@ async def ingest_data_points(
async def ingest_data(request: IngestDataToCollectionDto):
"""Ingest data into the collection"""
try:
collection = METADATA_STORE_CLIENT.get_collection_by_name(
collection_name=request.collection_name, no_cache=True
)
client = await get_client()
if isinstance(client, TrueFoundry):
loop = asyncio.get_event_loop()
collection = await loop.run_in_executor(
None, client.get_collection_by_name, request.collection_name
)
else:
collection = await client.get_collection_by_name(request.collection_name)

# convert to pydantic model if not already -> For prisma models
if not isinstance(collection, Collection):
collection = Collection(**collection.dict())

if not collection:
logger.error(
f"Collection with name {request.collection_name} does not exist."
Expand Down Expand Up @@ -331,11 +429,12 @@ async def ingest_data(request: IngestDataToCollectionDto):
collection.associated_data_sources.values()
)

logger.info(f"Associated: {associated_data_sources_to_be_ingested}")
for associated_data_source in associated_data_sources_to_be_ingested:
logger.debug(
f"Starting ingestion for data source fqn: {associated_data_source.data_source_fqn}"
)
if not request.run_as_job:
if not request.run_as_job or settings.LOCAL:
data_ingestion_run = CreateDataIngestionRun(
collection_name=collection.name,
data_source_fqn=associated_data_source.data_source_fqn,
Expand All @@ -344,11 +443,15 @@ async def ingest_data(request: IngestDataToCollectionDto):
data_ingestion_mode=request.data_ingestion_mode,
raise_error_on_failure=request.raise_error_on_failure,
)
created_data_ingestion_run = (
METADATA_STORE_CLIENT.create_data_ingestion_run(
if isinstance(client, TrueFoundry):
loop = asyncio.get_event_loop()
created_data_ingestion_run = await loop.run_in_executor(
None, client.create_data_ingestion_run, data_ingestion_run
)
else:
created_data_ingestion_run = await client.create_data_ingestion_run(
data_ingestion_run=data_ingestion_run
)
)
await sync_data_source_to_collection(
inputs=DataIngestionConfig(
collection_name=created_data_ingestion_run.collection_name,
Expand Down Expand Up @@ -379,11 +482,15 @@ async def ingest_data(request: IngestDataToCollectionDto):
data_ingestion_mode=request.data_ingestion_mode,
raise_error_on_failure=request.raise_error_on_failure,
)
created_data_ingestion_run = (
METADATA_STORE_CLIENT.create_data_ingestion_run(
if isinstance(client, TrueFoundry):
loop = asyncio.get_event_loop()
created_data_ingestion_run = await loop.run_in_executor(
None, client.create_data_ingestion_run, data_ingestion_run
)
else:
created_data_ingestion_run = await client.create_data_ingestion_run(
data_ingestion_run=data_ingestion_run
)
)
trigger_job(
application_fqn=settings.JOB_FQN,
component_name=settings.JOB_COMPONENT_NAME,
Expand All @@ -401,6 +508,7 @@ async def ingest_data(request: IngestDataToCollectionDto):
status_code=201,
content={"message": "triggered"},
)

except HTTPException as exp:
raise exp
except Exception as exp:
Expand Down
3 changes: 2 additions & 1 deletion backend/modules/dataloaders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
register_dataloader("localdir", LocalDirLoader)
register_dataloader("web", WebLoader)
register_dataloader("github", GithubLoader)
register_dataloader("truefoundry", TrueFoundryLoader)
if settings.TFY_API_KEY:
register_dataloader("truefoundry", TrueFoundryLoader)
Loading

0 comments on commit 427b5e9

Please sign in to comment.