25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
7.0 KiB

  1. from flask_jwt_extended import get_jwt_identity
  2. from Config.DB import Schemas, Tables
  3. from Config import constants, db
  4. from flask import jsonify
  5. from functools import wraps
  6. import random, json
  7. class RouteProvider:
  8. def __init__(self):
  9. self.auth_user = None
  10. self.schemas = Schemas()
  11. self.tables = Tables()
  12. self.db = db
  13. self.constants = constants
  14. @classmethod
  15. def access_controller(cls, access_level=["*"]):
  16. """
  17. Based on the access_level array, checks if the authenticated user request has access to the requested route.
  18. Might require recoding if you use a different users authentication implementation
  19. """
  20. def decorator(fn):
  21. @wraps(fn)
  22. def wrapper(self, *args, **kwargs):
  23. current_user = get_jwt_identity()
  24. try:
  25. current_user = json.loads(current_user)
  26. except Exception as e:
  27. print(current_user)
  28. return cls._abort(418, "I'm a teapot")
  29. if not current_user:
  30. return cls._abort(401, "User is not authenticated to access this route")
  31. tables = Tables()
  32. user = tables.Users.query.filter_by(id=current_user["id"]).first()
  33. if user is None:
  34. return cls._abort(403, "Something went wrong when trying to authenticate your access. Please try to log in again.")
  35. if "*" not in access_level:
  36. schemas = Schemas()
  37. user_schemafied = schemas.User.dump(user)
  38. if user_schemafied["role"]["value"] not in access_level:
  39. return cls._abort(403, "Please check your access level..")
  40. self.auth_user = user
  41. return fn(self, *args, **kwargs)
  42. return wrapper
  43. return decorator
  44. @staticmethod
  45. def _abort(code, message):
  46. """
  47. The function is used to provide an error back to the user with an appropriate code and message (msg).
  48. """
  49. return jsonify({"msg": message, "code": code}), code
  50. def validate(self, keys, data):
  51. """
  52. The function validates the request - checks if all the required keys are in the data dictionary
  53. """
  54. for key in keys:
  55. if key not in data:
  56. return False
  57. return True
  58. def check_constraint(self, data, table):
  59. """
  60. Checking constraints - making sure that the new entry does not break uniqueness constraints
  61. """
  62. for key in table.__unique__:
  63. if key not in data:
  64. continue
  65. res = table.query.filter_by(**{key: data[key]}).first()
  66. if res is not None and key != "id":
  67. if "id" in data and int(data["id"]) == res.id:
  68. continue
  69. return f"Conflict: {key} '{data[key]}' already exists. Please use a different value."
  70. return True
  71. def save_file(self, files, file_key, static_suffix="/", name=None):
  72. """
  73. The request object and file key must be passed.
  74. If static_suffix is not passed, the function assumes static_root as save path.
  75. If name is not passed, the system will generate one randomly.
  76. Make sure the root location has been granted 755 privilege and is owned by the same
  77. user who executes and starts the server
  78. """
  79. if name is None:
  80. name = self.get_random_alphanumerical()
  81. if file_key not in files:
  82. return False
  83. file = files[file_key]
  84. extension = self.get_extension(file)
  85. name = name + "." + extension
  86. print(self.constants)
  87. file.save(self.constants["static_root"] + static_suffix + name)
  88. return static_suffix + name
  89. def get_random_alphanumerical(self, _len = 16):
  90. """
  91. Provides a truely random alphanumerical string with _len number of characters
  92. """
  93. asciiCodes = []
  94. alphanumerical = ""
  95. asciiCodes += random.sample(range(97, 122), int(round(0.375 * _len)))
  96. asciiCodes += random.sample(range(65, 90), int(round(0.375 * _len)))
  97. asciiCodes += random.sample(range(48, 57), int(round(0.25 * _len)))
  98. random.shuffle(asciiCodes)
  99. for char in asciiCodes:
  100. alphanumerical += chr(char)
  101. return alphanumerical
  102. def get_random_numerical(self, _len = 16):
  103. """
  104. Provides a truely random numerical string with _len number of characters
  105. """
  106. asciiCodes = []
  107. alphanumerical = ""
  108. asciiCodes += random.sample(range(48, 57), _len)
  109. random.shuffle(asciiCodes)
  110. for char in asciiCodes:
  111. alphanumerical += chr(char)
  112. return alphanumerical
  113. def generate_secret_key(self, length):
  114. """
  115. Generates a secret key with length number of characters. The secret key consists of all lower case letters.
  116. """
  117. key = ""
  118. for x in range(length):
  119. rand = random.randint(97, 122)
  120. key += chr(rand)
  121. return key
  122. def get_extension(self, _f):
  123. """
  124. Provides the extension of the _f file
  125. """
  126. ext = str(_f.filename.split(".")[len(_f.filename.split(".")) - 1])
  127. return ext
  128. def get_hash_info(self, args):
  129. """
  130. Extracts the hash information out of the requests and provides info about enabled / key / type
  131. """
  132. return {
  133. "enable_hash" : False if "enable_hash" not in args or args["enable_hash"] != "true" else True,
  134. "hash_key" : "id" if "hash_key" not in args else args["hash_key"],
  135. "hash_type" : True if "hash_type" not in args or args["hash_type"] == "cbht" else False
  136. }
  137. def build_params(self, keys, args):
  138. """
  139. Using the table structure (keys), checks if each key is in the request arguments (args).
  140. If the key is in args, then it is properly parsed and formatted and returned in the params dictionary
  141. """
  142. params = dict()
  143. for key in keys:
  144. if key in args:
  145. params[key] = args[key] if keys[key] != "Integer" else int(args[key])
  146. return params
  147. def hash_query_results(self, array, col_key, cbht=True):
  148. """
  149. Creates a hash table of closed or open bucket type from a specific array with N dictionaries where the key exists inside each dictrionary.
  150. """
  151. if type(array) != list:
  152. array = [array]
  153. if len(array) == 0:
  154. return []
  155. ret = [None for _ in range(max(array,key=lambda x: x[col_key])[col_key]+1)]
  156. for item in array:
  157. if cbht:
  158. ret[item[col_key]] = item
  159. else:
  160. if ret[item[col_key]] is None:
  161. ret[item[col_key]] = [item]
  162. else:
  163. ret[item[col_key]].append(item)
  164. return ret