Skip to content

Commit

Permalink
Instant Dispatch
Browse files Browse the repository at this point in the history
Instant Dispatch & Minor Fixes
  • Loading branch information
CoffeePerry committed Mar 16, 2021
1 parent 3842def commit b56b9d3
Show file tree
Hide file tree
Showing 15 changed files with 180 additions and 98 deletions.
1 change: 1 addition & 0 deletions configs/config-development.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

# Celery - for Tasks
BROKER_URL = 'amqp://[USERNAME]:[PASSWORD]@localhost:5672/mercury'
CELERY_RESULT_BACKEND = MONGO_URI

# Email
MAIL_SERVER = ''
Expand Down
1 change: 1 addition & 0 deletions configs/config-production.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

# Celery - for Tasks
BROKER_URL = 'amqp://[USERNAME]:[PASSWORD]@localhost:5672/mercury'
CELERY_RESULT_BACKEND = MONGO_URI

# Email
MAIL_SERVER = ''
Expand Down
18 changes: 10 additions & 8 deletions mercury/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
# coding=utf-8

from .services.database_sql import init_app as init_database_sql
from .services.database_nosql_mongo import init_app as init_database_nosql_mongo
from .services.auth import init_app as init_auth
from .services.tasks import init_app as init_tasks
from .services.tasks.notification import init_app as init_notification

from os import path, makedirs

from flask import Flask, send_from_directory
from flask_restful import Api

from .services.auth import init_app as init_auth
from .services.database_nosql_mongo import init_app as init_database_nosql_mongo
from .services.database_sql import init_app as init_database_sql
from .services.signals import init_app as init_signals
from .services.tasks import init_app as init_tasks
from .services.tasks.notification import init_app as init_notification

CONFIG_FILENAME = 'config.py'

api = Api()
Expand Down Expand Up @@ -43,10 +44,11 @@ def create_app():
app.config.from_pyfile(path.join(app.instance_path, CONFIG_FILENAME))

secret_key = app.config.get('SECRET_KEY')
if secret_key is None:
if not secret_key:
app.logger.error(f'Set variable SECRET_KEY with random string in file: '
f'{path.join(app.instance_path, CONFIG_FILENAME)}')

init_signals(app)
init_database_sql(app)
init_database_nosql_mongo(app)

Expand Down Expand Up @@ -78,7 +80,7 @@ def favicon():
return send_from_directory(path.join(app.root_path, 'static'),
'favicon.ico', mimetype='image/vnd.microsoft.icon')

@app.cli.command('info')
@app.cli.command('info', help='Display info.')
def info():
"""Print Mercury info.
Expand Down
10 changes: 5 additions & 5 deletions mercury/controllers/notification.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# coding=utf-8

from flask import abort, request
from flask_jwt_extended import jwt_required, get_jwt_identity
from flask_restful import Resource, marshal

from mercury.services import notification as services_notification
from mercury.services import user as services_user
from mercury.services.custom_exceptions import MethodVersionNotFound

from flask import abort, request
from flask_restful import Resource, marshal
from flask_jwt_extended import jwt_required, get_jwt_identity


class NotificationListAPI(Resource):
decorators = [jwt_required]
Expand Down Expand Up @@ -81,7 +81,7 @@ def put(self, user_id, _id):
if not request.json:
abort(400)
result = services_notification.update_notification(_id, request.json, user_id)
if result is None:
if not result:
return {'result': False}
return {'notification': marshal(result, services_notification.notification_fields)}
raise MethodVersionNotFound()
Expand Down
8 changes: 4 additions & 4 deletions mercury/controllers/user.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# coding=utf-8

from mercury.services import user as services_user
from mercury.services.custom_exceptions import MethodVersionNotFound

from flask import abort, request
from flask_restful import Resource, marshal
from flask_jwt_extended import jwt_required, get_jwt_identity
from flask_restful import Resource, marshal

from mercury.services import user as services_user
from mercury.services.custom_exceptions import MethodVersionNotFound


class UserListAPI(Resource):
Expand Down
6 changes: 3 additions & 3 deletions mercury/models/user.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# coding=utf-8

from mercury.services.database_sql import db

from typing import Final
from datetime import datetime
from secrets import token_hex, compare_digest
from typing import Final

from mercury.services.database_sql import db


class User(db.Model):
Expand Down
1 change: 1 addition & 0 deletions mercury/services/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(self, app=None):
self.CELERY_RESULT_BACKEND = self.MONGO_URI
self.CELERY_LOGS_FOLDER = 'logs'
self.CELERY_BEAT_FOLDER = 'celerybeat'
self.CELERY_BEAT_CRONTAB_MINUTE = '*' # Every minute

# Email
self.MAIL_SERVER = None
Expand Down
24 changes: 12 additions & 12 deletions mercury/services/database_sql.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# coding=utf-8

from os import makedirs
from os import path

from flask_sqlalchemy import SQLAlchemy
from flask.cli import AppGroup

from os import makedirs
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()
db_cli = AppGroup('database')
Expand Down Expand Up @@ -35,17 +34,18 @@ def init_app(app):
app.logger.warning(f'Database SQL not found! File: {app.config["DATABASE_FILENAME"]}')


@db_cli.command('create')
def create_database():
"""Create the database schema."""
@db_cli.command('create-tables', help='Create all SQL database tables.')
def create_tables():
"""Create all SQL database tables."""
db.create_all()
print('Database SQL created')
print('SQL Database\'s tables created')


@db_cli.command('drop')
@db_cli.command('drop-tables', help='Drop all SQL database tables.')
def drop_database():
"""Drop the database."""
response = input('Are you really sure to delete the SQL database and all the data it contains? [Y/n]:')
if response.lower() == 'y':
"""Drop all SQL database tables."""
if input('Are you really sure to delete all SQL database tables? [y/N]:').lower() == 'y':
db.drop_all()
print('Database SQL dropped')
print('SQL Database\'s tables dropped')
else:
print('Aborted!')
24 changes: 24 additions & 0 deletions mercury/services/dictionaries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# coding=utf-8

from sys import version_info

"""Python versions constants"""
PY3_9 = version_info >= (3, 9)
PY3_5 = version_info >= (3, 5)


def merge_dicts(first_dict, second_dict):
"""Merge two dictionaries.
:param first_dict: First dictionary.
:param second_dict: Second dictionary.
:return: Resultant dictionary.
"""
if PY3_9:
return first_dict | second_dict
elif PY3_5:
return {**first_dict, **second_dict}
else:
result = first_dict.copy()
result.update(second_dict)
return result
54 changes: 34 additions & 20 deletions mercury/services/notification.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
# coding=utf-8

from mercury.services.database_nosql_mongo import db as mongo

from datetime import datetime
from bson import ObjectId

from bson import ObjectId
from flask import current_app
from flask_restful import fields, reqparse

from mercury.services.database_nosql_mongo import db as mongo
from mercury.services.signals import signals
from mercury.services.dictionaries import merge_dicts

"""Signals"""
notification_dispatch = signals.signal('notification-dispatch')

"""Fields to marshal notification to JSON."""
notification_fields = {
'category': fields.String,
Expand All @@ -18,7 +24,6 @@
}
}


'''CRUD Functions'''


Expand All @@ -28,7 +33,7 @@ def get_request_parser(request_parser=None):
:param request_parser: If exists, add request parser argument to request_parser param.
:return: Notification request parser.
"""
if request_parser is None:
if not request_parser:
result = reqparse.RequestParser()
else:
result = request_parser
Expand Down Expand Up @@ -68,12 +73,9 @@ def insert_notification(notification, user_id):
notification['datetime_schedule'] = notification.get('datetime_schedule',
datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
notification.pop('_id', None)
return {
'_id': mongo.db.notification.insert_one(notification).inserted_id,
'user_id': user_id,
'category': notification['category'],
'datetime_schedule': notification['datetime_schedule']
}
notification['_id'] = str(mongo.db.notification.insert_one(notification).inserted_id)
instant_dispatch(notification)
return notification


def update_notification(id, notification, user_id=None):
Expand All @@ -84,19 +86,17 @@ def update_notification(id, notification, user_id=None):
:param notification: Notification to persist.
:return: Persisted notification's base informations or error.
"""
if user_id is None:
user_id = notification['user_id']
else:
if user_id:
notification['user_id'] = user_id
else:
user_id = notification['user_id']
notification.pop('_id', None)
notification_found = mongo.db.notification.find_one_or_404({'_id': ObjectId(id), 'user_id': user_id})
if mongo.db.notification.update_one({'_id': notification_found['_id']}, {'$set': notification}).acknowledged:
return {
'_id': id,
'user_id': user_id,
'category': notification['category'],
'datetime_schedule': notification.get('datetime_schedule', notification_found['datetime_schedule'])
}
notification = merge_dicts(notification_found, notification)
notification['_id'] = str(notification['_id'])
instant_dispatch(notification)
return notification
else:
return None

Expand Down Expand Up @@ -124,3 +124,17 @@ def find_notifications_to_dispatch():
'datetime_schedule': {'$lte': datetime.now().strftime('%Y-%m-%d %H:%M:%S')},
'datetime_dispatch': None
})


def instant_dispatch(notification):
"""Check if it's must to do notification's instant dispatch.
:param notification: notification to check and eventually immediately dispatch.
"""
if notification and (
(not notification.get('datetime_schedule')) or (
datetime.strptime(notification['datetime_schedule'], '%Y-%m-%d %H:%M:%S') <= datetime.now() and
(not notification.get('datetime_dispatch'))
)
):
notification_dispatch.send(current_app._get_current_object(), notification=notification)
13 changes: 13 additions & 0 deletions mercury/services/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# coding=utf-8

from flask.signals import Namespace

signals = Namespace()


def init_app(app):
"""Initializes the application Signals.
:param app: The Flask application object.
"""
pass
12 changes: 6 additions & 6 deletions mercury/services/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# coding=utf-8

from os import path, makedirs

from celery import Celery
from celery.schedules import crontab

from os import path, makedirs

celery = Celery(__name__)


Expand All @@ -15,7 +15,7 @@ def init_app(app):
"""
celery.conf.broker_url = app.config['BROKER_URL']
celery.conf.update(app.config)
celery.config_from_object(CeleryBeatConfig()) # Load Celery Beat instance config
celery.config_from_object(CeleryBeatConfig(app)) # Load Celery Beat instance config

try:
# Ensure the celery beat folder exists
Expand Down Expand Up @@ -48,18 +48,18 @@ def __call__(self, *args, **kwargs):


class CeleryBeatConfig(object):
def __init__(self):
def __init__(self, app):
"""CeleryBeatConfig constructor"""
self.CELERY_TASK_SERIALIZER = 'json'
self.CELERY_RESULT_SERIALIZER = 'json'
self.CELERY_ACCEPT_CONTENT = ['json']
self.CELERY_IMPORTS = ('mercury.services.tasks.notification', )
self.CELERY_IMPORTS = ('mercury.services.tasks.notification',)
self.CELERY_TIMEZONE = 'UTC'
self.CELERY_TASK_RESULT_EXPIRES = 30

self.CELERYBEAT_SCHEDULE = {
'route_notifications': {
'task': 'mercury.services.tasks.notification.route_notifications',
'schedule': crontab(minute='*'), # Every minute
'schedule': crontab(minute=app.config['CELERY_BEAT_CRONTAB_MINUTE']),
}
}
Loading

0 comments on commit b56b9d3

Please sign in to comment.