Skip to content

Commit

Permalink
superlinter fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
zerj9 committed Mar 3, 2025
1 parent 49ed5f9 commit d7b945d
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 76 deletions.
Empty file.
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import json
import logging
import sys
import os
import oracledb
import sys
from datetime import datetime

import boto3
import oracledb
from aws_xray_sdk.core import patch_all, xray_recorder
from dotenv import load_dotenv
from mojap_metadata import Metadata
from mojap_metadata.converters.sqlalchemy_converter import SQLAlchemyConverter
from mojap_metadata.converters.etl_manager_converter import EtlManagerConverter
from mojap_metadata.converters.glue_converter import GlueConverter, GlueTable
import boto3
from mojap_metadata.converters.glue_converter import GlueConverter
from mojap_metadata.converters.sqlalchemy_converter import SQLAlchemyConverter
from sqlalchemy import create_engine
from dotenv import load_dotenv
from aws_xray_sdk.core import xray_recorder, patch_all

patch_all()

Expand Down Expand Up @@ -180,7 +180,7 @@ def get_database_metadata(self, output_bucket):
return tables


def handler(event, context):
def handler(event, context): # pylint: disable=unused-argument
# TODO: PASS IN AS ENV VARS
os.environ["RAW_HISTORY_BUCKET"] = "dms-test-raw-history-20250221145111054600000001"

Expand Down Expand Up @@ -217,15 +217,15 @@ def handler(event, context):

# Get the glue database to check if it exists. handle EntityNotFoundException
try:
db_response = glue.get_database(Name=db_identifier)
glue.get_database(Name=db_identifier)
logger.info(f"Database {db_identifier} already exists")
except glue.exceptions.EntityNotFoundException:
# Create the database if it does not exist. Fails is it cannot be created
logger.info(f"Database {db_identifier} does not exist. Creating it now")
response = glue.create_database(
DatabaseInput={
'Name': db_identifier,
'Description': f'{db_identifier} - DMS Pipeline'
"Name": db_identifier,
"Description": f"{db_identifier} - DMS Pipeline",
}
)

Expand All @@ -234,23 +234,30 @@ def handler(event, context):

# Used to create glue tables based on Metadata objects
gc = GlueConverter()
glue_table_definitions = [gc.generate_from_meta(
table,
db_identifier.replace("_", "-"),
f"s3://{raw_history_bucket}/{schema_name}/{table.name}") for table in db_metadata
glue_table_definitions = [
gc.generate_from_meta(
table,
db_identifier.replace("_", "-"),
f"s3://{raw_history_bucket}/{schema_name}/{table.name}",
)
for table in db_metadata
]

for table in glue_table_definitions:
try:
table_response = glue.get_table(DatabaseName=db_identifier, Name=table["TableInput"]["Name"])
glue.get_table(DatabaseName=db_identifier, Name=table["TableInput"]["Name"])
logger.info(f"Table {table['TableInput']['Name']} already exists")
# Update the table if it exists
logger.info(f"Updating table {table['TableInput']['Name']}")
response = glue.update_table(DatabaseName=db_identifier, TableInput=table["TableInput"])
glue.update_table(
DatabaseName=db_identifier, TableInput=table["TableInput"]
)
except glue.exceptions.EntityNotFoundException:
logger.info(f"Table {table['TableInput']['Name']} does not exist. Creating it now")
logger.info(
f"Table {table['TableInput']['Name']} does not exist. Creating it now"
)
response = glue.create_table(**table)
logger.info(response)
logger.debug(response)

# Output json metadata to S3
for table in db_metadata:
Expand All @@ -274,30 +281,31 @@ def handler(event, context):

# Move these keys to the landing bucket
for key in invalid_keys:
# Extract X-Ray trace ID
trace_id = xray_recorder.current_segment().trace_id

# Get original object metadata (if exists)
original_metadata = s3.head_object(Bucket=invalid_bucket_name, Key=key).get('Metadata', {})

# Preserve existing metadata and add X-Ray trace ID
updated_metadata = original_metadata.copy()
updated_metadata["X-Amzn-Trace-Id"] = trace_id

# Add object metadata to state that it has been reprocessed
updated_metadata["reprocessed"] = "true"

# Copy object with new metadata
s3.copy_object(
CopySource=f"{invalid_bucket_name}/{key}",
Bucket=landing_bucket_name,
Key=key,
Metadata=updated_metadata,
MetadataDirective='REPLACE' # Ensures metadata is replaced with the new one
)
# Extract X-Ray trace ID
trace_id = xray_recorder.current_segment().trace_id

# Delete original object
s3.delete_object(Bucket=invalid_bucket_name, Key=key)
# Get original object metadata (if exists)
original_metadata = s3.head_object(Bucket=invalid_bucket_name, Key=key).get(
"Metadata", {}
)

# Preserve existing metadata and add X-Ray trace ID
updated_metadata = original_metadata.copy()
updated_metadata["X-Amzn-Trace-Id"] = trace_id

# Add object metadata to state that it has been reprocessed
updated_metadata["reprocessed"] = "true"

# Copy object with new metadata
s3.copy_object(
CopySource=f"{invalid_bucket_name}/{key}",
Bucket=landing_bucket_name,
Key=key,
Metadata=updated_metadata,
MetadataDirective="REPLACE", # Ensures metadata is replaced with the new one
)

# Delete original object
s3.delete_object(Bucket=invalid_bucket_name, Key=key)

logger.info("Done reprocessing failed records")
Empty file.
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import json
import os
from datetime import datetime
import re
from datetime import datetime
from urllib.parse import unquote_plus

import boto3
import s3fs
from aws_xray_sdk.core import patch_all
from pyarrow import ArrowInvalid
from pyarrow.parquet import ParquetFile
import s3fs
from urllib3 import PoolManager
from aws_xray_sdk.core import xray_recorder, patch_all

patch_all()

Expand Down Expand Up @@ -88,10 +88,10 @@ def strip_data_type(data_type: str) -> str:

if data_type in type_lookup or data_type in type_lookup.values():
return data_type
else:
raise MetadataTypeMismatchException(
f"'{data_type}' is not valid or is not a supported data type."
)

raise MetadataTypeMismatchException(
f"'{data_type}' is not valid or is not a supported data type."
)


def return_agnostic_type(data_type: str, column_name: str = None) -> str:
Expand All @@ -118,12 +118,12 @@ def return_agnostic_type(data_type: str, column_name: str = None) -> str:
if data_type in type_lookup.keys():
agnostic_type = type_lookup[data_type]
return agnostic_type
else:
raise MetadataTypeMismatchException(
f"The column type '{data_type}' "
f"{'for column ' + column_name if column_name else ''} is already an "
"agnostic type."
)

raise MetadataTypeMismatchException(
f"The column type '{data_type}' "
f"{'for column ' + column_name if column_name else ''} is already an "
"agnostic type."
)


class MetadataTypeMismatchException(Exception):
Expand Down Expand Up @@ -155,7 +155,7 @@ class FileValidator:
The name of the bucket where the metadata is located.
"""

def __init__(
def __init__( # pylint: disable=too-many-positional-arguments,too-many-arguments
self,
key: str,
pass_bucket: str,
Expand Down Expand Up @@ -209,20 +209,21 @@ def execute(self):
Moves the validated file to to the pass or fail bucket depending on the result
of the validation.
"""
client = boto3.client("secretsmanager")
#secrets = client.get_secret_value(SecretId=os.getenv("SLACK_SECRET_KEY"))
#secrets = json.loads(secrets.get("SecretString"))
#url = secrets.get("webhook_url")
#channel = secrets.get("channel")
# client = boto3.client("secretsmanager")
# secrets = client.get_secret_value(SecretId=os.getenv("SLACK_SECRET_KEY"))
# secrets = json.loads(secrets.get("SecretString"))
# url = secrets.get("webhook_url")
# channel = secrets.get("channel")

self._validate_file(path=f"{self.bucket_from}/{self.key}")
if self.errors:
event_time = datetime.utcnow().isoformat(sep=" ", timespec="milliseconds")
location = (
f"https://s3.console.aws.amazon.com/s3/buckets/"
f"{self.fail_bucket}/{self.key}"
)
#payload = {
# TODO: Implement the slack notifications - commented out for now
# event_time = datetime.utcnow().isoformat(sep=" ", timespec="milliseconds")
# location = (
# f"https://s3.console.aws.amazon.com/s3/buckets/"
# f"{self.fail_bucket}/{self.key}"
# )
# payload = {
# "channel": channel,
# "text": "",
# "username": "AWS Lambda",
Expand All @@ -247,12 +248,13 @@ def execute(self):
# },
# },
# ],
#}
# }

self.bucket_to = self.fail_bucket
move_object(self.bucket_to, self.bucket_from, self.key)

#for error in self.errors:
# More slack notification code
# for error in self.errors:
# print(
# f"VALIDATION ERROR\n"
# f"File {self.key} failed validation\r"
Expand All @@ -261,10 +263,10 @@ def execute(self):
# )
# payload["blocks"][1]["text"]["text"] += f"\n*Failure Reason:* {error}"

#encoded_payload = json.dumps(payload).encode("utf-8")
#http.request(
# encoded_payload = json.dumps(payload).encode("utf-8")
# http.request(
# method="POST", url=url, body=encoded_payload
#)
# )
else:
move_object(self.bucket_to, self.bucket_from, self.key)

Expand Down Expand Up @@ -433,7 +435,7 @@ def _validate_column_types(
self._add_error(error=str(e))


def handler(event, context): # noqa: C901
def handler(event, context): # noqa: C901 pylint: disable=unused-argument
pass_bucket = os.environ["PASS_BUCKET"]
fail_bucket = os.environ["FAIL_BUCKET"]
metadata_bucket = os.environ["METADATA_BUCKET"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ module "metadata_generator" {
}

source_path = [{
path = "${path.module}/lambda-functions/metadata-generator/main.py"
pip_tmp_dir = "${path.module}/lambda-functions/metadata-generator/fixtures"
pip_requirements = "${path.module}/lambda-functions/metadata-generator/requirements.txt"
path = "${path.module}/lambda-functions/metadata_generator/main.py"
pip_tmp_dir = "${path.module}/lambda-functions/metadata_generator/fixtures"
pip_requirements = "${path.module}/lambda-functions/metadata_generator/requirements.txt"
}]

tags = var.tags
Expand Down

0 comments on commit d7b945d

Please sign in to comment.