|
1 |
| -import tempfile |
2 | 1 | from logging import getLogger
|
3 | 2 |
|
4 | 3 | import mailparser
|
| 4 | +import requests |
5 | 5 | from django.conf import settings
|
6 |
| -from django.core.exceptions import ImproperlyConfigured |
7 | 6 | from django.utils.timezone import now
|
| 7 | +from rest_framework import status |
8 | 8 |
|
9 |
| -from datahub.documents import utils as documents |
10 | 9 | from datahub.email_ingestion.models import MailboxLogging, MailboxProcessingStatus
|
11 |
| -from datahub.interaction.email_processors.processors import CalendarInteractionEmailProcessor |
| 10 | +from datahub.interaction.email_processors.processors import InteractionPlainEmailProcessor |
| 11 | + |
12 | 12 |
|
13 | 13 | logger = getLogger(__name__)
|
14 | 14 |
|
15 | 15 | BUCKET_ID = 'mailbox'
|
16 | 16 |
|
17 | 17 |
|
18 |
| -def get_mail_docs_in_bucket(): |
19 |
| - """ |
20 |
| - Gets all mail documents in the bucket. |
21 |
| - """ |
22 |
| - if BUCKET_ID not in settings.DOCUMENT_BUCKETS: |
23 |
| - raise ImproperlyConfigured(f'Bucket "{BUCKET_ID}" is missing in settings') |
| 18 | +def _get_headers(token): |
| 19 | + return { |
| 20 | + 'Authorization': f'Bearer {token}', |
| 21 | + } |
24 | 22 |
|
25 |
| - config = settings.DOCUMENT_BUCKETS[BUCKET_ID] |
26 |
| - if 'bucket' not in config: |
27 |
| - raise ImproperlyConfigured(f'Bucket "{BUCKET_ID}" not configured properly in settings') |
28 | 23 |
|
29 |
| - name = config['bucket'] |
30 |
| - if not name: |
31 |
| - raise ImproperlyConfigured( |
32 |
| - f'Bucket "{BUCKET_ID}" bucket value not configured properly in settings', |
33 |
| - ) |
| 24 | +def _get_base_url(): |
| 25 | + user_email = settings.MAILBOX_INGESTION_EMAIL |
| 26 | + return f'{settings.MAILBOX_INGESTION_GRAPH_URL}users/{user_email}' |
| 27 | + |
| 28 | + |
| 29 | +def get_access_token(tenant_id, client_id, client_secret): |
| 30 | + token_url = f'https://login.microsoftonline.com/{tenant_id}/oauth/v2.0/token' |
| 31 | + token_data = { |
| 32 | + 'grant_type': 'client_credentials', |
| 33 | + 'client_id': client_id, |
| 34 | + 'client_secret': client_secret, |
| 35 | + 'scope': 'https://graph.microsoft.com/.default', |
| 36 | + } |
| 37 | + token_request = requests.post(token_url, data=token_data) |
| 38 | + return token_request.json().get('access_token') |
| 39 | + |
| 40 | + |
| 41 | +def read_messages(token): |
| 42 | + base_url = _get_base_url() |
| 43 | + messages_url = f'{base_url}/mailFolders/Inbox/messages' |
| 44 | + |
| 45 | + messages_request = requests.get( |
| 46 | + messages_url, |
| 47 | + headers=_get_headers(token), |
| 48 | + ) |
| 49 | + messages = messages_request.json().get('value', []) |
| 50 | + return messages |
| 51 | + |
34 | 52 |
|
35 |
| - client = documents.get_s3_client_for_bucket(bucket_id=BUCKET_ID) |
| 53 | +def fetch_message(token, message_id): |
| 54 | + base_url = _get_base_url() |
| 55 | + content_url = f'{base_url}/messages/{message_id}/$value' |
36 | 56 |
|
37 |
| - paginator = client.get_paginator('list_objects') |
38 |
| - for page in paginator.paginate(Bucket=name): |
39 |
| - for doc in page.get('Contents') or []: |
40 |
| - key = doc['Key'] |
41 |
| - with tempfile.TemporaryFile(mode='w+b') as f: |
42 |
| - client.download_fileobj(Bucket=name, Key=key, Fileobj=f) |
43 |
| - f.seek(0) |
44 |
| - content = f.read() |
45 |
| - yield {'source': key, 'content': content} |
| 57 | + content_request = requests.get(content_url, headers=_get_headers(token)) |
| 58 | + if content_request.status_code == status.HTTP_200_OK: |
| 59 | + content = content_request.text |
| 60 | + return content |
| 61 | + |
| 62 | + return None |
| 63 | + |
| 64 | + |
| 65 | +def delete_message(token, message_id): |
| 66 | + base_url = _get_base_url() |
| 67 | + delete_path = '/mailFolders/Inbox/messages/' |
| 68 | + delete_url = f'{base_url}{delete_path}{message_id}' |
| 69 | + |
| 70 | + delete_request = requests.delete(delete_url, headers=_get_headers(token)) |
| 71 | + return delete_request.status_code == status.HTTP_204_NO_CONTENT |
46 | 72 |
|
47 | 73 |
|
48 | 74 | def process_ingestion_emails():
|
49 | 75 | """
|
50 | 76 | Gets all new mail documents in the bucket and process each message.
|
51 | 77 | """
|
52 |
| - processor = CalendarInteractionEmailProcessor() |
| 78 | + processor = InteractionPlainEmailProcessor() |
53 | 79 |
|
54 |
| - for message in get_mail_docs_in_bucket(): |
55 |
| - source = message['source'] |
56 |
| - try: |
57 |
| - documents.delete_document(bucket_id=BUCKET_ID, document_key=message['source']) |
58 |
| - except Exception as e: |
59 |
| - logger.exception('Error deleting message: "%s", error: "%s"', source, e) |
| 80 | + token = get_access_token( |
| 81 | + settings.MAILBOX_INGESTION_TENANT_ID, |
| 82 | + settings.MAILBOX_INGESTION_CLIENT_ID, |
| 83 | + settings.MAILBOX_INGESTION_CLIENT_SECRET, |
| 84 | + ) |
| 85 | + |
| 86 | + for message in read_messages(token): |
| 87 | + message_id = message['id'] |
| 88 | + |
| 89 | + content = fetch_message(token, message_id) |
| 90 | + if not content: |
| 91 | + logger.error('Error fetching message: "%s"', message_id) |
| 92 | + continue |
| 93 | + if not delete_message(token, message_id): |
| 94 | + logger.error('Error deleting message: "%s"', message_id) |
60 | 95 | continue
|
61 | 96 |
|
62 | 97 | try:
|
63 |
| - log = _create_log_entry(source, message) |
| 98 | + log = _create_log_entry(message_id, message, content) |
64 | 99 |
|
65 |
| - email = mailparser.parse_from_bytes(message['content']) |
| 100 | + email = mailparser.parse_from_string(content) |
66 | 101 | processed, reason, interaction_id = processor.process_email(message=email)
|
67 | 102 | if not processed:
|
68 | 103 | _update_log_status(log, MailboxProcessingStatus.FAILURE, reason, None)
|
69 |
| - logger.error('Error parsing message: "%s", error: "%s"', source, reason) |
| 104 | + logger.error('Error parsing message: "%s", error: "%s"', message_id, reason) |
70 | 105 | else:
|
71 | 106 | _update_log_status(log, MailboxProcessingStatus.PROCESSED, reason, interaction_id)
|
72 | 107 | logger.info(reason)
|
73 | 108 | except Exception as e:
|
74 | 109 | _update_log_status(log, MailboxProcessingStatus.FAILURE, repr(e), None)
|
75 |
| - logger.exception('Error processing message: "%s", error: "%s"', source, e) |
| 110 | + logger.exception('Error processing message: "%s", error: "%s"', message_id, e) |
76 | 111 |
|
77 | 112 | logger.info(
|
78 |
| - 'Successfully processed message "%s" and deleted document from bucket "%s"', |
79 |
| - source, |
80 |
| - BUCKET_ID, |
| 113 | + 'Successfully processed message "%s" and deleted it from mailbox.', |
| 114 | + message_id, |
81 | 115 | )
|
82 | 116 |
|
83 | 117 |
|
84 |
| -def _create_log_entry(source, message): |
| 118 | +def _create_log_entry(source, message, content): |
85 | 119 | log = MailboxLogging(
|
86 | 120 | retrieved_on=now(),
|
87 |
| - content=message['content'].decode('utf-8'), |
| 121 | + content=content, |
88 | 122 | source=source,
|
89 | 123 | status=MailboxProcessingStatus.RETRIEVED,
|
90 | 124 | )
|
|
0 commit comments