Source code for plom.server.plomServer.routesRubric

# SPDX-License-Identifier: AGPL-3.0-or-later
# Copyright (C) 2019-2021 Andrew Rechnitzer
# Copyright (C) 2020-2023 Colin B. Macdonald
# Copyright (C) 2020 Vala Vakilian

from aiohttp import web

from .routeutils import authenticate_by_token, authenticate_by_token_required_fields
from .routeutils import validate_required_fields, log_request
from .routeutils import log


[docs]class RubricHandler: """The Rubric Handler interfaces between the HTTP API and the server itself. These routes handle requests related to rubrics. """ def __init__(self, plomServer): self.server = plomServer
[docs] def validateRubric(self, username, rubric): """Do some simple validation of the rubric Args: username (str): the name of the user trying to create the rubric rubric (dict): a dict containing the rubric info Returns: bool: true if valid, false otherwise. """ # check rubric has minimal fields needed need_fields = ("kind", "value", "display_delta", "text", "question") if any(x not in rubric for x in need_fields): return False # check question number is in range if ( rubric["question"] <= 0 or rubric["question"] > self.server.testSpec["numberOfQuestions"] ): return False # set maxMark for checking marks are in range. maxMark = self.server.testSpec["question"][str(rubric["question"])]["mark"] if rubric["kind"] == "neutral": # neutral rubric must have no delta - ie delta == '.' if rubric["display_delta"] != ".": return False # must have some text if len(rubric["text"].strip()) == 0: return False # do we care if its int/None? # if not (rubric["value"] is None or isinstance(rubric["value"], int)): # return False # TODO: should we enforce value=0/None? if not (rubric["value"] is None or int(rubric["value"]) == 0): return False elif rubric["kind"] == "relative": # must have some text if len(rubric["text"].strip()) == 0: return False # do we care if its int? # if not isinstance(rubric["value"], int): # return False # the delta must be of the form -k or +k if rubric["display_delta"][0] not in ["-", "+"]: return False # check rest of delta string is numeric if not rubric["display_delta"][1:].isnumeric(): return False # check delta is in range idelta = int(rubric["display_delta"]) if (idelta < -maxMark) or (idelta > maxMark) or (idelta == 0): return False elif rubric["kind"] == "absolute": # must have some text if len(rubric["text"].strip()) == 0: return False # do we care if its int? # if not isinstance(rubric["value"], int): # return False # check score in range value = int(rubric["value"]) if (value < 0) or (value > maxMark): return False else: # rubric kind must be neutral, relative, delta or absolute return False # passes tests return True
# @routes.put("/MK/rubric")
[docs] @authenticate_by_token_required_fields(["user", "rubric"]) def McreateRubric(self, data, request): """Respond with updated comment list and add received comments to the database. Args: data (dict): A dictionary including user/token and the new rubric to be created request (aiohttp.web_request.Request): A request of type PUT /MK/rubric. Returns: aiohttp.web_response.Response: either 200 with the new key or 406 if sent rubric was incomplete. """ username = data["user"] new_rubric = data["rubric"] if not self.validateRubric(username, new_rubric): return web.HTTPNotAcceptable(reason="Rubric info incomplete/inconsistent") ok, key_or_reason = self.server.McreateRubric(username, new_rubric) if ok: return web.json_response(key_or_reason, status=200) return web.HTTPNotAcceptable(reason=key_or_reason)
# @routes.get("/MK/rubric")
[docs] @authenticate_by_token_required_fields(["user"]) def MgetRubrics(self, data, request): """Respond with the current comment list. Args: data (dict): A dictionary including user/token request (aiohttp.web_request.Request): A request of type GET /MK/rubric. Returns: aiohttp.web_response.Response: List of all comments in DB """ username = data["user"] rubrics = self.server.MgetRubrics() return web.json_response(rubrics, status=200)
# @routes.get("/MK/rubric/{question}")
[docs] @authenticate_by_token_required_fields(["user"]) def MgetRubricsByQuestion(self, data, request): """Respond with the comment list for a particular question. Args: data (dict): A dictionary including user/token request (aiohttp.web_request.Request): A request of type GET /MK/rubric/{question}. Returns: aiohttp.web_response.Response: List of all comments in DB """ username = data["user"] question = request.match_info["question"] rubrics = self.server.MgetRubrics(question) return web.json_response(rubrics, status=200)
# @routes.patch("/MK/rubric/{key}")
[docs] @authenticate_by_token_required_fields(["user", "rubric"]) def MmodifyRubric(self, data, request): """Add modify rubric to DB and respond with its key Args: data (dict): A dictionary including user/token and the new rubric to be created request (aiohttp.web_request.Request): A request of type GET /MK/rubric. Returns: aiohttp.web_response.Response: either 200 with the key or 406 if sent rubric was incomplete or inconsistent, 409 if no rubric found, or some unexpected situation. """ username = data["user"] updated_rubric = data["rubric"] key = request.match_info["key"] if key != updated_rubric["id"]: return web.HTTPBadRequest(reason="Key mismatch in request") if not self.validateRubric(username, updated_rubric): return web.HTTPNotAcceptable(reason="Sent rubric was inconsistent") ok, key_or_reason = self.server.MmodifyRubric(username, key, updated_rubric) if ok: return web.json_response(key_or_reason, status=200) if key_or_reason == "incomplete": return web.HTTPNotAcceptable(reason="Sent rubric was incomplete") elif key_or_reason == "noSuchRubric": return web.HTTPConflict(reason="No rubric with that key found") return web.HTTPConflict(reason=f"Unexpected error/bug: '{key_or_reason}'")
# @routes.get("/MK/user/{user}/{question}")
[docs] @authenticate_by_token_required_fields(["user", "question"]) def MgetUserRubricPanes(self, data, request): """Get user's rubric-panes configuration from server Args: data (dict): A dictionary including user/token and question number. request (aiohttp.web_request.Request): GET `/MK/user/{user}/{question}`. Returns: aiohttp.web_response.Response: success and the config (as json), or 204 if nothing available. Error responses: - HTTPUnauthorized - HTTPBadRequest: inconsistent question or missing fields. - HTTPForbidden: trying to save to another user. """ username = data["user"] question = data["question"] save_to_user = request.match_info["user"] questionCheck = request.match_info["question"] if int(question) != int(questionCheck): raise web.HTTPBadRequest(reason="question numbers inconsistent") if username != save_to_user: # TODO maybe manager should be able to? raise web.HTTPForbidden(reason="you can only access your own rubric data") rval = self.server.MgetUserRubricPanes(save_to_user, question) if rval[0]: # worked return web.json_response(rval[1], status=200) else: # nothing there. return web.Response(status=204)
# @routes.put("/MK/user/{user}/{question}")
[docs] @authenticate_by_token_required_fields(["user", "rubric_config", "question"]) def MsaveUserRubricPanes(self, data, request): """Add new rubric to DB and respond with its key Args: data (dict): A dictionary including user/token and a blob of data to save for the user's rubric tab setup. request (aiohttp.web_request.Request): PUT `/MK/user/{user}/{question}`. Returns: aiohttp.web_response.Response: 200 on success or - HTTPUnauthorized - HTTPBadRequest: inconsistent question or missing fields. - HTTPForbidden: trying to save to another user. """ username = data["user"] question = data["question"] rubricConfig = data["rubric_config"] save_to_user = request.match_info["user"] questionCheck = request.match_info["question"] if int(question) != int(questionCheck): raise web.HTTPBadRequest(reason="question numbers inconsistent") if username != save_to_user: # TODO maybe manager should be able to? raise web.HTTPForbidden(reason="you can only save to your own rubric data") self.server.MsaveUserRubricPanes(save_to_user, question, rubricConfig) return web.Response(status=200)
# ===================== # rubric analysis stuff
[docs] @authenticate_by_token_required_fields(["user"]) def RgetTestRubricMatrix(self, data, request): """Respond with dict encoding test-rubric counts. Responds with status 200/401. Args: data (dict): A dictionary having the user/token. request (aiohttp.web_request.Request): Request of type GET /REP/test_rubric_adjacency. Returns: aiohttp.web_response.Response: A response including metadata encoding the test-rubric adjacency / count matrix. The matrix is encoded as an adjacency list, i.e., ``{testnumber: [rubric_id1, rubric_id2, ...]}`` where `(test_n, rubric_k)` means that `rubric_k` was used in `test_n`. """ if not data["user"] == "manager": return web.Response(status=401) rmsg = self.server.RgetTestRubricMatrix() # is a dict of the form blah[test_number][rubric_key] = count return web.json_response(rmsg, status=200)
[docs] @authenticate_by_token_required_fields(["user"]) def RgetRubricCounts(self, data, request): """Respond with dict encoding rubric counts and other minimal info. Responds with status 200/401. Args: data (dict): A dictionary having the user/token. request (aiohttp.web_request.Request): Request of type GET /REP/rubric/counts. Returns: aiohttp.web_response.Response: A response including metadata encoding the rubric counts and min info. Returns a list of rubrics, and for each rubric we give a dict listing its id, kind, question, delta, text, user who created it, and the count of how many tests it has been used in. """ if not data["user"] == "manager": return web.Response(status=401) rmsg = self.server.RgetRubricCounts() return web.json_response(rmsg, status=200)
[docs] @authenticate_by_token_required_fields(["user"]) def RgetRubricDetails(self, data, request): """Respond with dict encoding rubric counts and other minimal info. Responds with status 200/401/BadRequest. Args: data (dict): A dictionary having the user/token. request (aiohttp.web_request.Request): Request of type GET /REP/rubric/key. Returns: aiohttp.web_response.Response: A response including metadata encoding the rubric details inc which tests use it. More precisely, we return a dict that gives the rurbrics id, kind, question, delta, text, who created it, tags, meta, count, creation and modification times, and a list of test numbers in which it was used. """ if not data["user"] == "manager": return web.Response(status=401) key = request.match_info["key"] rmsg = self.server.RgetRubricDetails(key) if rmsg[0]: return web.json_response(rmsg[1], status=200) else: raise web.HTTPBadRequest(reason="no such rubric")
[docs] def setUpRoutes(self, router): """Adds the response functions to the router object. Args: router (aiohttp.web_urldispatcher.UrlDispatcher): Router object which we will add the response functions to. """ router.add_put("/MK/rubric", self.McreateRubric) router.add_get("/MK/rubric", self.MgetRubrics) router.add_get("/MK/rubric/{question}", self.MgetRubricsByQuestion) router.add_patch("/MK/rubric/{key}", self.MmodifyRubric) router.add_get("/MK/user/{user}/{question}", self.MgetUserRubricPanes) router.add_put("/MK/user/{user}/{question}", self.MsaveUserRubricPanes) router.add_get("/REP/test_rubric_matrix", self.RgetTestRubricMatrix) router.add_get("/REP/rubric/counts", self.RgetRubricCounts) router.add_get("/REP/rubric/{key}", self.RgetRubricDetails)
##