Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract common ingestion tasks #5847

Merged
merged 8 commits into from
Dec 12, 2024
3 changes: 2 additions & 1 deletion config/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@
'datahub.email_ingestion',
'datahub.dnb_api',
'datahub.event',
'datahub.investment_lead',
'datahub.feature_flag.apps.FeatureFlagConfig',
'datahub.ingest',
'datahub.interaction',
'datahub.investment.project',
'datahub.investment.project.evidence',
Expand All @@ -105,6 +105,7 @@
'datahub.investment.project.notification',
'datahub.investment.investor_profile',
'datahub.investment.opportunity',
'datahub.investment_lead',
'datahub.metadata',
'datahub.oauth',
*_ADMIN_OAUTH2_APP,
Expand Down
36 changes: 35 additions & 1 deletion conftest.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from unittest.mock import Mock
from unittest.mock import Mock, patch

import boto3
import factory
import pytest

from botocore.stub import Stubber
from django.conf import settings
from django.core.cache import cache
from django.core.management import call_command
from django.db.models.signals import post_save
from moto import mock_aws
from opensearchpy.helpers.test import get_test_client
from pytest_django.lazy_django import skip_if_no_django

Expand All @@ -15,6 +18,7 @@
from datahub.core.test_utils import create_test_user, HawkAPITestClient
from datahub.dnb_api.utils import format_dnb_company
from datahub.documents.utils import get_s3_client_for_bucket
from datahub.ingest.constants import TEST_AWS_REGION, TEST_S3_BUCKET_NAME
from datahub.metadata.test.factories import SectorFactory
from datahub.search.apps import get_search_app_by_model, get_search_apps
from datahub.search.bulk_sync import sync_objects
Expand Down Expand Up @@ -103,6 +107,36 @@ def _patch(obj, callable_name):

yield _patch

# AWS


@pytest.fixture
def aws_credentials():
"""Mocked AWS credentials for moto."""
with patch.dict('os.environ', {
'AWS_ACCESS_KEY_ID': 'test-key-id',
'AWS_SECRET_ACCESS_KEY': 'test-secret',
'AWS_SECURITY_TOKEN': 'test-token',
'AWS_SESSION_TOKEN': 'test-token',
'AWS_DEFAULT_REGION': TEST_AWS_REGION,
}):
yield


@pytest.fixture
def s3_client(aws_credentials):
"""Fixture for a mocked S3 client.

Also creates a bucket named `test-bucket` in the same region.
"""
with mock_aws():
s3_client = boto3.client('s3', region_name=TEST_AWS_REGION)
s3_client.create_bucket(
Bucket=TEST_S3_BUCKET_NAME,
CreateBucketConfiguration={'LocationConstraint': TEST_AWS_REGION},
)
yield s3_client


@pytest.fixture()
def s3_stubber():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
WinAdviserFactory,
WinFactory,
)
from datahub.ingest.test.factories import IngestedObjectFactory
from datahub.interaction.test.factories import (
CompanyInteractionFactory,
InteractionDITParticipantFactory,
InteractionExportCountryFactory,

)
from datahub.investment.investor_profile.test.factories import LargeCapitalInvestorProfileFactory
from datahub.investment.opportunity.test.factories import LargeCapitalOpportunityFactory
Expand Down Expand Up @@ -82,6 +82,7 @@
'company_referral.CompanyReferral': CompanyReferralFactory,
'event.Event': EventFactory,
'export_win.LegacyExportWinsToDataHubCompany': LegacyExportWinsToDataHubCompanyFactory,
'ingest.IngestedObject': IngestedObjectFactory,
'interaction.InteractionDITParticipant': InteractionDITParticipantFactory,
'interaction.Interaction': CompanyInteractionFactory,
'interaction.InteractionExportCountry': InteractionExportCountryFactory,
Expand Down
Empty file added datahub/ingest/__init__.py
Empty file.
45 changes: 45 additions & 0 deletions datahub/ingest/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from django.contrib import admin

from datahub.ingest.models import IngestedObject


class IngestedObjectAdmin(admin.ModelAdmin):
search_fields = [
'id',
'created',
'object_key',
'object_created',
]
list_display = [
'id',
'created',
'object_key',
'object_created',
]
readonly_fields = [
'id',
'created',
]
fieldsets = [
(
None,
{
'fields': [
'id',
'created',
],
},
),
(
'Object Details',
{
'fields': [
'object_key',
'object_created',
],
},
),
]


admin.site.register(IngestedObject, IngestedObjectAdmin)
5 changes: 5 additions & 0 deletions datahub/ingest/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from django.apps import AppConfig


class IngestConfig(AppConfig):
name = 'datahub.ingest'
97 changes: 97 additions & 0 deletions datahub/ingest/boto3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import logging

from datetime import datetime

import boto3

from botocore.exceptions import ClientError
from django.conf import settings

from datahub.ingest.constants import (
AWS_REGION,
S3_BUCKET_NAME,
)
from datahub.ingest.models import IngestedObject


logger = logging.getLogger(__name__)


def get_s3_client(region: str):
if settings.S3_LOCAL_ENDPOINT_URL:
logger.debug('Using local S3 endpoint %s', settings.S3_LOCAL_ENDPOINT_URL)
return boto3.client('s3', region, endpoint_url=settings.S3_LOCAL_ENDPOINT_URL)
return boto3.client('s3', region)


class S3ObjectProcessor:
"""Base class for processing objects located at a specified prefix within an S3 bucket."""

def __init__(
self,
prefix: str,
region: str = AWS_REGION,
bucket: str = S3_BUCKET_NAME,
s3_client=None,
) -> None:
self.prefix = prefix
self.region = region
self.bucket = bucket
if s3_client is None:
self.s3_client = get_s3_client(region)
else:
self.s3_client = s3_client

def list_objects(self) -> list[str]:
"""Returns a list of metadata about each object located at self.prefix.

From the boto3 docs, the metadata will contain the following:
- Key (string) - the name that you assign the object
- LastModified (datetime) - creation date of the object
- ETag (string) - entity tag; a hash of the object
- ChecksumAlgorithm (list) - the algorithm used to create a checksum of the object
- Size (integer) - size in bytes of the object
- StorageClass (string) - the class of storage used to store the object
- Owner (dict) - the owner of the object
- RestoreStatus (dict) - the restoration status of the the object
"""
response = self.s3_client.list_objects(
Bucket=self.bucket,
Prefix=self.prefix,
)
return response.get('Contents', [])

def get_most_recent_object_key(self) -> str:
"""Return the most recent object's key in the self.bucket at self.prefix."""
objects = self.list_objects()
if not objects:
return None
most_recent_object = max(objects, key=lambda x: x['LastModified'])
return most_recent_object['Key']

def get_object_last_modified_datetime(self, object_key: str) -> datetime:
"""Get last modified datetime of a specific object."""
try:
response = self.s3_client.get_object(
Bucket=self.bucket,
Key=object_key,
)
return response.get('LastModified')
except ClientError as e:
logger.error(
f'Error getting last modified datetime for {object_key}: {str(e)}',
)
raise e

def has_object_been_ingested(self, object_key: str) -> bool:
"""Determines if the specified object has already been ingested."""
return IngestedObject.objects.filter(object_key=object_key).exists()

def get_last_ingestion_datetime(self) -> datetime | None:
"""Get last ingestion datetime of an object with the same prefix (directory)."""
try:
return IngestedObject.objects.filter(
object_key__icontains=self.prefix,
).latest('object_created').object_created
except IngestedObject.DoesNotExist:
return None
14 changes: 14 additions & 0 deletions datahub/ingest/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import environ


env = environ.Env()


AWS_REGION = env('AWS_DEFAULT_REGION', default='eu-west-2')
DATA_FLOW_EXPORTS_PREFIX = 'data-flow/exports/'
S3_BUCKET_NAME = f"data-flow-bucket-{env('ENVIRONMENT', default='')}"

TEST_AWS_REGION = 'eu-west-2'
TEST_PREFIX = 'test/'
TEST_OBJECT_KEY = f'{TEST_PREFIX}object.json.gz'
TEST_S3_BUCKET_NAME = 'test-bucket'
25 changes: 25 additions & 0 deletions datahub/ingest/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Generated by Django 4.2.16 on 2024-11-21 17:14

from django.db import migrations, models
import django.utils.timezone
import uuid


class Migration(migrations.Migration):

initial = True

dependencies = [
]

operations = [
migrations.CreateModel(
name='IngestedObject',
fields=[
('id', models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)),
('created', models.DateTimeField(auto_now_add=True, help_text='DateTime the instance was created')),
('object_key', models.CharField(help_text='The S3 object path', max_length=255)),
('object_created', models.DateTimeField(default=django.utils.timezone.now, help_text='DateTime the ingested object was last modified in S3')),
],
),
]
Empty file.
29 changes: 29 additions & 0 deletions datahub/ingest/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import uuid

from django.conf import settings
from django.db import models
from django.utils import timezone

from datahub.core import reversion


MAX_LENGTH = settings.CHAR_FIELD_MAX_LENGTH


@reversion.register_base_model()
class IngestedObject(models.Model):
"""Model to track which source objects (files) have been ingested already."""

id = models.UUIDField(primary_key=True, default=uuid.uuid4)
created = models.DateTimeField(
auto_now_add=True,
help_text='DateTime the instance was created',
)
object_key = models.CharField(
max_length=MAX_LENGTH,
help_text='The S3 object path',
)
object_created = models.DateTimeField(
default=timezone.now,
help_text='DateTime the ingested object was last modified in S3',
)
Loading
Loading