Source code for plom.server.plomServer.routesUserInit

# SPDX-License-Identifier: AGPL-3.0-or-later
# Copyright (C) 2019-2020 Andrew Rechnitzer
# Copyright (C) 2020-2022 Colin B. Macdonald
# Copyright (C) 2020 Vala Vakilian
# Copyright (C) 2022 Edith Coates

from aiohttp import web

from .routeutils import authenticate_by_token_required_fields
from .routeutils import no_authentication_only_log_request
from .routeutils import validate_required_fields, log_request
from .routeutils import write_admin
from .routeutils import log

from plom import SpecVerifier


[docs]class UserInitHandler: """The UserInit Handler interfaces between the HTTP API and the server itself. These routes handle requests related to administration of user accounts. """ def __init__(self, plomServer): self.server = plomServer # @routes.get("/Version") @no_authentication_only_log_request async def version(self, request): return web.Response( text="Plom server version {} with API {}".format( self.server.Version, self.server.API ), status=200, ) # @routes.delete("/authorisation") async def clearAuthorisation(self, request): log_request("clearAuthorisation", request) data = await request.json() if not validate_required_fields(data, ["user", "password"]): return web.Response(status=400) # malformed request. if not self.server.checkPassword(data["user"], data["password"]): return web.Response(status=401) log.info('User "{}" force-logout self'.format(data["user"])) self.server.closeUser(data["user"]) return web.Response(status=200) # @routes.delete("/users/{user}")
[docs] @authenticate_by_token_required_fields(["user"]) def closeUser(self, data, request): """User self-indicates they are logging out, revoke token and tasks. Returns: aiohttp.web.Response: 200 for success, unless a user tries to close another which is BadRequest (400) or user is already logged out, or nonexistent, both of which will give an Unauthorized (401). """ # TODO: should manager be allowed to do this for anyone? if data["user"] != request.match_info["user"]: raise web.HTTPBadRequest(reason="You cannot close other users") self.server.closeUser(data["user"]) return web.Response(status=200)
# @routes.delete("/authorisation/{user}") @authenticate_by_token_required_fields(["user"]) def clearAuthorisationUser(self, data, request): # Only manager can clear other users, via token auth # TODO: ok for manager to clear manager via token auth? # TODO: should other users be able to use this on themselves? if not data["user"] == "manager": return web.Response(status=400) # malformed request. theuser = request.match_info["user"] self.server.closeUser(theuser) log.info('Manager force-logout user "{}"'.format(theuser)) return web.Response(status=200) # @routes.post("/authorisation/{user}")
[docs] @authenticate_by_token_required_fields(["password"]) @write_admin def createUser(self, data, request): """Create a new user.""" theuser = request.match_info["user"] ok, val = self.server.createUser(theuser, data["password"]) if not ok: log.info('Manager failed to create user "%s"', theuser) return web.HTTPNotAcceptable(reason=val) else: log.info('Manager created new user "%s"', theuser) return web.Response(status=200)
# @routes.patch("/authorisation/{user}")
[docs] @authenticate_by_token_required_fields(["password"]) @write_admin def changeUserPassword(self, data, request): """Change an existing user's password.""" theuser = request.match_info["user"] ok, val = self.server.changeUserPassword(theuser, data["password"]) if not ok: log.info('Manager failed to change the password of user "%s"', theuser) return web.HTTPNotAcceptable(reason=val) else: log.info('Manager changed password of user "%s"', theuser) return web.Response(status=200)
# @routes.put("/enable/{user}") @authenticate_by_token_required_fields([]) @write_admin def enableUser(self, data, request): theuser = request.match_info["user"] if theuser in ("manager", "HAL"): raise web.HTTPBadRequest(reason="HAL/manager cannot be enabled/disabled") log.info('Enabling user "%s"', theuser) self.server.setUserEnable(theuser, True) return web.Response(status=200) # @routes.put("/disable/{user}") @authenticate_by_token_required_fields([]) @write_admin def disableUser(self, data, request): theuser = request.match_info["user"] if theuser == "manager": raise web.HTTPBadRequest(reason="Cannot disable the manager account") if theuser == "HAL": raise web.HTTPBadRequest( reason="Just what do you think you're doing, Dave?" ) log.info('Disabling user "%s"', theuser) self.server.setUserEnable(theuser, False) return web.Response(status=200) # @routes.put("/users/{user}") async def giveUserToken(self, request): log_request("giveUserToken", request) ip = request.remote data = await request.json() if not validate_required_fields(data, ["user", "pw", "api", "client_ver"]): return web.Response(status=400) # malformed request. if data["user"] != request.match_info["user"]: return web.Response(status=400) # malformed request. rmsg = self.server.giveUserToken( data["user"], data["pw"], data["api"], data["client_ver"], ip ) if rmsg[0]: return web.json_response(rmsg[1], status=200) # all good, return the token elif rmsg[1].startswith("API"): return web.json_response(rmsg[2], status=400) elif rmsg[1].startswith("HasToken"): return web.json_response(rmsg[2], status=409) else: # various sorts of non-auth conflated: response has details return web.json_response(rmsg[2], status=401) # @routes.get("/info/spec")
[docs] @no_authentication_only_log_request async def info_spec(self, request): """Return the public part of the server specification. Returns: aiohttp.json_response: - 200: the public part of the spec. - 400: spec not found (server does not have one yet). """ spec = self.server.info_spec() if not spec: raise web.HTTPBadRequest(reason="Server does not yet have a spec") return web.json_response(spec, status=200)
# @routes.put("/info/spec")
[docs] @authenticate_by_token_required_fields(["user", "spec"]) @write_admin def put_spec(self, data, request): """Accept an uploaded exam specification. Returns: aiohttp.Response: - 403: only manager can upload a spec. - 400: the provided spec is not valid. - 409: Conflict: server has already initialised or populated the database. - 200: new spec file accepted. TODO: would be polite to inform caller if we already had one or not. """ if self.server.DB.is_paper_database_initialised(): raise web.HTTPConflict( reason="Server has initialised DB: cannot accept spec" ) sv = SpecVerifier(data["spec"]) try: sv.verifySpec(verbose="log") sv.checkCodes(verbose="log") except ValueError as e: raise web.HTTPBadRequest(reason=f"{e}") sv.saveVerifiedSpec() self.server.testSpec = SpecVerifier.load_verified() log.info("spec loaded: %s", self.server.info_spec()) return web.Response()
# @routes.get("/info/shortName")
[docs] @no_authentication_only_log_request async def InfoShortName(self, request): """The short name of the exam. Returns: aiohttp.Response: 200 and the short name or 400 if the server has no spec. """ name = self.server.InfoShortName() if not name: raise web.HTTPBadRequest(reason="Server does not yet have a spec") return web.Response(text=name, status=200)
def setUpRoutes(self, router): router.add_get("/Version", self.version) router.add_delete("/users/{user}", self.closeUser) router.add_put("/users/{user}", self.giveUserToken) router.add_get("/info/shortName", self.InfoShortName) router.add_get("/info/spec", self.info_spec) router.add_put("/info/spec", self.put_spec) router.add_delete("/authorisation", self.clearAuthorisation) router.add_delete("/authorisation/{user}", self.clearAuthorisationUser) router.add_post("/authorisation/{user}", self.createUser) router.add_patch("/authorisation/{user}", self.changeUserPassword) router.add_put("/enable/{user}", self.enableUser) router.add_put("/disable/{user}", self.disableUser)