| @@ -0,0 +1,39 @@ | |||
| from functools import wraps | |||
| from flask import request, jsonify | |||
| import time | |||
| class Cache: | |||
| def __init__(self): | |||
| """ | |||
| Simple caching mechanism that will only cache data if the user is not sending any GET / POST arguments in the request. | |||
| Use in combination with the RouteProvider class to be able to include in a request without extra instantiation and reduce performance cost. | |||
| """ | |||
| self.cache = {} | |||
| def __call__(self, timeout): | |||
| """ | |||
| Can be expanded to include request payload based caching - make a caching hash which includes the request.args and request.form data. | |||
| Keep in mind that it is memory taxing given a dynamic request type. | |||
| Caching large payload routes with thousands of possible arguments could cause a memory overflow and crash the server. | |||
| """ | |||
| def decorator(func): | |||
| @wraps(func) | |||
| def wrapper(*args, **kwargs): | |||
| cache_key = f"{request.method}:{request.path}" | |||
| if (cache_key in self.cache and time.time() <= self.cache[cache_key]["date_expires"]) and not ( | |||
| request.form.keys() or request.args.keys() | |||
| ): | |||
| return jsonify(self.cache[cache_key]["data"]) | |||
| else: | |||
| data = func(*args, **kwargs) | |||
| expiration_time = time.time() + timeout | |||
| self.cache[cache_key] = { | |||
| "date_created": time.time(), | |||
| "date_expires": expiration_time, | |||
| "data": data | |||
| } | |||
| return jsonify(data) | |||
| return wrapper | |||
| return decorator | |||
| _cache = Cache() | |||
| @@ -0,0 +1,51 @@ | |||
| from Config import db, uri | |||
| from sqlalchemy.orm import sessionmaker, scoped_session | |||
| import sqlalchemy, datetime | |||
| class SQL: | |||
| def __init__(self): | |||
| self.engine = sqlalchemy.create_engine(uri) | |||
| self.Session = scoped_session(sessionmaker(bind=self.engine)) | |||
| pass | |||
| def query(self, query, params): | |||
| session = self.Session() | |||
| result = session.execute(query, params).fetchall() | |||
| ret = [] | |||
| for row in result: | |||
| ret.append(dict(row)) | |||
| return ret | |||
| def str_to_date(str, format="%Y-%m-%d %H:%M:%S"): | |||
| example = datetime.datetime(2021, 6, 2, 9, 39) | |||
| if str == None: | |||
| return False | |||
| if type(str) == type(example): | |||
| return str | |||
| return datetime.datetime.strptime(str, format) | |||
| def dict_is_xor(_dict, keys): | |||
| exists = False | |||
| for key in keys: | |||
| if key in _dict and exists == False: | |||
| exists = key | |||
| continue | |||
| if key in _dict and exists != False: | |||
| return False | |||
| return exists | |||
| def crud_routes(request, instance): | |||
| method = request.method | |||
| if method == "POST": | |||
| return instance.create(request) | |||
| elif method == "GET": | |||
| return instance.read(request) | |||
| elif method == "PUT": | |||
| return instance.update(request) | |||
| elif method == "DELETE": | |||
| return instance.delete(request) | |||
| @@ -0,0 +1,26 @@ | |||
| constants = { | |||
| "local" : { | |||
| "mysql": { | |||
| "db_name": "DATABASE NAME", | |||
| "host": "DATABASE HOST", | |||
| "user": "DATABASE USER", | |||
| "password": "DATABASE PASSWORD", | |||
| }, | |||
| "static_root": "/path/to/static/folder/root" | |||
| }, | |||
| "remote" : { | |||
| "mysql" : { | |||
| "db_name": "DATABASE NAME", | |||
| "host": "DATABASE HOST", | |||
| "user": "DATABASE USER", | |||
| "password": "DATABASE PASSWORD", | |||
| }, | |||
| "static_root": "/path/to/static/folder/root" | |||
| }, | |||
| "selector" : "remote" | |||
| } | |||
| # KEEP IN MIND. ONLY DECLARE YOUR PASSWORD IN CONSTANTS DURING DEVELOPMENT. | |||
| # ONCE IN PRODUCTION, USE ENVIRONMENTAL VARIABLES | |||
| constants = constants[constants["selector"]] | |||
| @@ -0,0 +1,35 @@ | |||
| from Config import db, app | |||
| import datetime | |||
| def connect_table_to_key(db, collection_name, foreign_keys): | |||
| return db.relationship(collection_name, foreign_keys=foreign_keys) | |||
| class Users(db.Model): | |||
| __tablename__ = 'Users' | |||
| id = db.Column(db.Integer, primary_key=True) | |||
| admin_id = db.Column(db.Integer, db.ForeignKey('Users.id'), nullable=True) | |||
| email = db.Column(db.String(256), unique=True, nullable=False) | |||
| password = db.Column(db.String(256), nullable=False) | |||
| sign_up_date = db.Column(db.DateTime, nullable=False) | |||
| archived = db.Column(db.Integer, nullable=False, default=0) | |||
| name = db.Column(db.String(64), nullable=False) | |||
| address = db.Column(db.String(256), nullable=False) | |||
| profile_picture_path = db.Column(db.String(512), nullable=False) | |||
| notification_token = db.Column(db.String(512), nullable=False, default="NA") | |||
| Admin = connect_table_to_key(db, 'Users', foreign_keys=[admin_id]) | |||
| __struct__ = { | |||
| "id" : "Number", | |||
| "admin_id" : "Number", | |||
| "email": "String", | |||
| "password": "String", | |||
| "sign_up_date": "String", | |||
| "archived": "Number", | |||
| "name": "String", | |||
| "address": "String", | |||
| "profile_picture_path": "String", | |||
| "notification_token": "String" | |||
| } | |||
| __unique__ = ["id", "email"] | |||
| @@ -0,0 +1,13 @@ | |||
| from flask_sqlalchemy import SQLAlchemy | |||
| from flask_marshmallow import Marshmallow | |||
| from marshmallow import fields | |||
| from Config.DB.Models import Users | |||
| ma = Marshmallow() | |||
| class UsersSchema(ma.SQLAlchemyAutoSchema): | |||
| admin = fields.Nested('self', attribute='Admin', many=False, default=None) | |||
| class Meta: | |||
| model = Users | |||
| exclude = ['password', 'notification_token'] | |||
| @@ -0,0 +1,11 @@ | |||
| from Config.DB.Models import Users | |||
| from Config.DB.Schemas import UsersSchema | |||
| class Tables: | |||
| Users = Users | |||
| # ADD REST OF THE TABLES HERE | |||
| class Schemas: | |||
| User = UsersSchema() | |||
| Users = UsersSchema(many=True) | |||
| # ADD REST OF THE SCHEMAS HERE | |||
| @@ -0,0 +1,181 @@ | |||
| from flask_jwt_extended import get_jwt_identity | |||
| from Config.DB import Schemas, Tables | |||
| from Config import constants, db | |||
| from flask import jsonify | |||
| from functools import wraps | |||
| import random, json | |||
| class RouteProvider: | |||
| def __init__(self): | |||
| self.auth_user = None | |||
| self.schemas = Schemas() | |||
| self.tables = Tables() | |||
| self.db = db | |||
| self.constants = constants | |||
| @classmethod | |||
| def access_controller(cls, access_level=["*"]): | |||
| """ | |||
| Based on the access_level array, checks if the authenticated user request has access to the requested route. | |||
| Might require recoding if you use a different users authentication implementation | |||
| """ | |||
| def decorator(fn): | |||
| @wraps(fn) | |||
| def wrapper(self, *args, **kwargs): | |||
| current_user = get_jwt_identity() | |||
| try: | |||
| current_user = json.loads(current_user) | |||
| except Exception as e: | |||
| print(current_user) | |||
| return cls._abort(418, "I'm a teapot") | |||
| if not current_user: | |||
| return cls._abort(401, "User is not authenticated to access this route") | |||
| tables = Tables() | |||
| user = tables.Users.query.filter_by(id=current_user["id"]).first() | |||
| if user is None: | |||
| return cls._abort(403, "Something went wrong when trying to authenticate your access. Please try to log in again.") | |||
| if "*" not in access_level: | |||
| schemas = Schemas() | |||
| user_schemafied = schemas.User.dump(user) | |||
| if user_schemafied["role"]["value"] not in access_level: | |||
| return cls._abort(403, "Please check your access level..") | |||
| self.auth_user = user | |||
| return fn(self, *args, **kwargs) | |||
| return wrapper | |||
| return decorator | |||
| @staticmethod | |||
| def _abort(code, message): | |||
| """ | |||
| The function is used to provide an error back to the user with an appropriate code and message (msg). | |||
| """ | |||
| return jsonify({"msg": message, "code": code}), code | |||
| def validate(self, keys, data): | |||
| """ | |||
| The function validates the request - checks if all the required keys are in the data dictionary | |||
| """ | |||
| for key in keys: | |||
| if key not in data: | |||
| return False | |||
| return True | |||
| def check_constraint(self, data, table): | |||
| """ | |||
| Checking constraints - making sure that the new entry does not break uniqueness constraints | |||
| """ | |||
| for key in table.__unique__: | |||
| if key not in data: | |||
| continue | |||
| res = table.query.filter_by(**{key: data[key]}).first() | |||
| if res is not None and key != "id": | |||
| if "id" in data and int(data["id"]) == res.id: | |||
| continue | |||
| return f"Conflict: {key} '{data[key]}' already exists. Please use a different value." | |||
| return True | |||
| def save_file(self, files, file_key, static_suffix="/", name=None): | |||
| """ | |||
| The request object and file key must be passed. | |||
| If static_suffix is not passed, the function assumes static_root as save path. | |||
| If name is not passed, the system will generate one randomly. | |||
| Make sure the root location has been granted 755 privilege and is owned by the same | |||
| user who executes and starts the server | |||
| """ | |||
| if name is None: | |||
| name = self.get_random_alphanumerical() | |||
| if file_key not in files: | |||
| return False | |||
| file = files[file_key] | |||
| extension = self.get_extension(file) | |||
| name = name + "." + extension | |||
| print(self.constants) | |||
| file.save(self.constants["static_root"] + static_suffix + name) | |||
| return static_suffix + name | |||
| def get_random_alphanumerical(self, _len = 16): | |||
| """ | |||
| Provides a truely random alphanumerical string with _len number of characters | |||
| """ | |||
| asciiCodes = [] | |||
| alphanumerical = "" | |||
| asciiCodes += random.sample(range(97, 122), int(round(0.375 * _len))) | |||
| asciiCodes += random.sample(range(65, 90), int(round(0.375 * _len))) | |||
| asciiCodes += random.sample(range(48, 57), int(round(0.25 * _len))) | |||
| random.shuffle(asciiCodes) | |||
| for char in asciiCodes: | |||
| alphanumerical += chr(char) | |||
| return alphanumerical | |||
| def get_random_numerical(self, _len = 16): | |||
| """ | |||
| Provides a truely random numerical string with _len number of characters | |||
| """ | |||
| asciiCodes = [] | |||
| alphanumerical = "" | |||
| asciiCodes += random.sample(range(48, 57), _len) | |||
| random.shuffle(asciiCodes) | |||
| for char in asciiCodes: | |||
| alphanumerical += chr(char) | |||
| return alphanumerical | |||
| def generate_secret_key(self, length): | |||
| """ | |||
| Generates a secret key with length number of characters. The secret key consists of all lower case letters. | |||
| """ | |||
| key = "" | |||
| for x in range(length): | |||
| rand = random.randint(97, 122) | |||
| key += chr(rand) | |||
| return key | |||
| def get_extension(self, _f): | |||
| """ | |||
| Provides the extension of the _f file | |||
| """ | |||
| ext = str(_f.filename.split(".")[len(_f.filename.split(".")) - 1]) | |||
| return ext | |||
| def get_hash_info(self, args): | |||
| """ | |||
| Extracts the hash information out of the requests and provides info about enabled / key / type | |||
| """ | |||
| return { | |||
| "enable_hash" : False if "enable_hash" not in args or args["enable_hash"] != "true" else True, | |||
| "hash_key" : "id" if "hash_key" not in args else args["hash_key"], | |||
| "hash_type" : True if "hash_type" not in args or args["hash_type"] == "cbht" else False | |||
| } | |||
| def build_params(self, keys, args): | |||
| """ | |||
| Using the table structure (keys), checks if each key is in the request arguments (args). | |||
| If the key is in args, then it is properly parsed and formatted and returned in the params dictionary | |||
| """ | |||
| params = dict() | |||
| for key in keys: | |||
| if key in args: | |||
| params[key] = args[key] if keys[key] != "Integer" else int(args[key]) | |||
| return params | |||
| def hash_query_results(self, array, col_key, cbht=True): | |||
| """ | |||
| Creates a hash table of closed or open bucket type from a specific array with N dictionaries where the key exists inside each dictrionary. | |||
| """ | |||
| if type(array) != list: | |||
| array = [array] | |||
| if len(array) == 0: | |||
| return [] | |||
| ret = [None for _ in range(max(array,key=lambda x: x[col_key])[col_key]+1)] | |||
| for item in array: | |||
| if cbht: | |||
| ret[item[col_key]] = item | |||
| else: | |||
| if ret[item[col_key]] is None: | |||
| ret[item[col_key]] = [item] | |||
| else: | |||
| ret[item[col_key]].append(item) | |||
| return ret | |||
| @@ -0,0 +1,25 @@ | |||
| # THIRD PARTY | |||
| from flask import Flask | |||
| from flask_cors import CORS | |||
| from flask_sqlalchemy import SQLAlchemy | |||
| from flask_jwt_extended import JWTManager, create_access_token | |||
| from flask_marshmallow import Marshmallow | |||
| # CONSTANTS IMPORT | |||
| from .Constants import constants | |||
| app = Flask(__name__, template_folder='../templates') # App initialization | |||
| # DB URI | |||
| uri = "mysql+pymysql://"+constants["mysql"]["user"]+":"+constants["mysql"]["password"]+"@"+constants["mysql"]["host"]+"/"+constants["mysql"]["db_name"]+"?&autocommit=false" | |||
| # APP CONFIGURATIONS | |||
| app.config["SQLALCHEMY_DATABASE_URI"] = uri | |||
| app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False | |||
| app.config["JWT_SECRET_KEY"] = "use-a-secret-key" | |||
| # INSTANCES | |||
| cors = CORS(app) | |||
| db = SQLAlchemy(app) | |||
| jwt = JWTManager(app) | |||
| ma = Marshmallow(app) | |||
| @@ -0,0 +1,105 @@ | |||
| from Config.RouteProvider import RouteProvider | |||
| from flask import jsonify | |||
| from flask_jwt_extended import jwt_required | |||
| from datetime import datetime | |||
| class UsersCRUD(RouteProvider): | |||
| def __init__(self): | |||
| super().__init__() | |||
| @jwt_required() | |||
| @RouteProvider.access_controller(access_level=["Administrator", "System Administrator"]) | |||
| def create(self, request): | |||
| data = request.form | |||
| files = request.files | |||
| required_keys = ["email", "password", "name", "address"] | |||
| required_files = ["profile_picture"] | |||
| if not self.validate(required_keys, data): | |||
| return self._abort(400, "Incorrectly formatted request. Please make sure that all the required fields are entered.") | |||
| if not self.validate(required_files, files): | |||
| return self._abort(400, "Incorrectly formatted request. Please make sure that all the required files are attached.") | |||
| constraint = self.check_constraint(data, self.tables.Users) | |||
| if self.check_constraint(data, self.tables.Users) is not True: | |||
| return self._abort(409, constraint) | |||
| user = self.tables.Users() | |||
| [setattr(user, key, data[key]) for key in required_keys] | |||
| profile_picture_path = self.save_file(files, "profile_picture", "/users/") | |||
| user.profile_picture_path = profile_picture_path | |||
| user.sign_up_date = datetime.utcnow() | |||
| self.db.session.add(user) | |||
| self.db.session.commit() | |||
| user = self.tables.Users.query.filter_by(email = data["email"]).first() | |||
| return jsonify({ "user" : self.schemas.User.dump(user) }) | |||
| @jwt_required() | |||
| @RouteProvider.access_controller(access_level=["*"]) | |||
| def read(self, request): | |||
| params = self.build_params(self.tables.Users.__struct__, request.args) | |||
| query_result = self.tables.Users.query.filter_by(**params).all() | |||
| return jsonify({ "users" : self.schemas.Users.dump(query_result), "args" : params }) | |||
| @jwt_required() | |||
| @RouteProvider.access_controller(access_level=["*"]) | |||
| def update(self, request): | |||
| data = request.form | |||
| files = request.files | |||
| required_keys = ["id"] | |||
| updatable_keys = [] | |||
| [updatable_keys.append(key) for key in self.tables.Users.__struct__ if key not in ["sign_up_date", "id"]] | |||
| required_files = ["profile_picture"] | |||
| if not self.validate(required_keys, data): | |||
| return self._abort(400, "Incorrectly formatted request. Please make sure that all the required fields are entered.") | |||
| user = self.tables.Users.query.filter_by(id = data["id"]).first() | |||
| if user is None: | |||
| return self._abort(404, "User not found") | |||
| constraint = self.check_constraint(data, self.tables.Users) | |||
| if constraint is not True: | |||
| return self._abort(409, constraint) | |||
| [setattr(user, key, data[key]) for key in updatable_keys if key in data] | |||
| if "profile_picture" in files: | |||
| profile_picture_path = self.save_file(files, "profile_picture", "/users/") | |||
| user.profile_picture_path = profile_picture_path | |||
| if data["admin_id"] == 'null': | |||
| user.admin_id = None | |||
| self.db.session.commit() | |||
| user = self.tables.Users.query.filter_by(id = data["id"]).first() | |||
| return jsonify({ "user" : self.schemas.User.dump(user) }) | |||
| @jwt_required() | |||
| @RouteProvider.access_controller(access_level=["Administrator", "System Administrator"]) | |||
| def delete(self, request): | |||
| params = self.build_params(self.tables.Users.__struct__, request.args) | |||
| if "id" not in params: | |||
| return self._abort(400, "Missing params info: 'id'") | |||
| user = self.tables.Users.query.filter_by(id = params["id"]).first() | |||
| if user is None: | |||
| return self._abort(404, f'User with ID \'{params["id"]}\' is not found') | |||
| user.archived = 1 if user.archived == 0 else 0 | |||
| if user.archived == 1: | |||
| user.archived = 0 | |||
| else: | |||
| user.archived = 1 | |||
| user = self.tables.Users.query.filter_by(**params).first() | |||
| return jsonify({ "user" : self.schemas.User.dump(user) }) | |||
| users_crud = UsersCRUD() | |||
| @@ -0,0 +1,18 @@ | |||
| # THIRD PARTY | |||
| from flask import request, Blueprint | |||
| # CONFIGURATION | |||
| from Config.Common import crud_routes | |||
| # ROUTES | |||
| from Routes.Users.Crud import users_crud | |||
| # Initialize a user blueprint | |||
| users_api = Blueprint("users_api", __name__) | |||
| # DEFINE YOUR ROUTES HERE | |||
| # | |||
| # EXAMPLE USAGE | |||
| @users_api.route("/crud", methods=["GET", "POST", "PUT", "DELETE"]) | |||
| def users_crud_routes(): | |||
| return crud_routes(request, users_crud) | |||
| @@ -0,0 +1,19 @@ | |||
| # FROM THIRD PARTY | |||
| from flask_requests import request | |||
| from flask import jsonify | |||
| # CONFIGURATIONS | |||
| from Config import app | |||
| # BLUEPRINTS / ROUTES | |||
| from Routes.Users import users_api | |||
| app.register_blueprint(users_api, url_prefix="/users/") | |||
| # PUBLIC META ROUTE - USED TO GIVE THE FRONTEND APPLICATION INFORMATION ABOUT NEW VERSION RELEASE ETC. | |||
| @app.route("/meta", methods=["GET"]) | |||
| def meta(): | |||
| return jsonify({ "version" : "0.0.0" }) | |||
| if __name__ == "__main__": | |||
| app.run() | |||
| @@ -0,0 +1,5 @@ | |||
| # FOR PRODUCTION USE | |||
| from app import app as application | |||
| if __name__ == "__main__": | |||
| application.run() | |||