Refactor direction frontend -> backend
All checks were successful
Build Formula10 Docker Image / build-docker (push) Successful in 15s

This commit is contained in:
2024-02-20 17:32:38 +01:00
parent b1d05cf7c8
commit 13103cc056
12 changed files with 516 additions and 350 deletions

203
backend_model.py Normal file
View File

@ -0,0 +1,203 @@
from typing import Dict, List, cast
from urllib.parse import quote
from flask import redirect
from werkzeug import Response
from database_utils import race_has_result, user_exists
from model import PodiumDrivers, RaceResult, SeasonGuess, TeamWinners, User, db, RaceGuess
from validation_utils import any_is_none, positions_are_contiguous
def find_or_create_race_guess(user_name: str, race_name: str) -> RaceGuess:
# There can be a single RaceGuess at most, since (user_name, race_name) is the composite primary key
race_guess: RaceGuess | None = db.session.query(RaceGuess).filter_by(user_name=user_name, race_name=race_name).first()
if race_guess is not None:
return race_guess
# Insert a new RaceGuess
race_guess = RaceGuess(user_name=user_name, race_name=race_name)
db.session.add(race_guess)
db.session.commit()
# Double check if database insertion worked and obtain any values set by the database
race_guess = db.session.query(RaceGuess).filter_by(user_name=user_name, race_name=race_name).first()
if race_guess is None:
raise Exception("Failed adding RaceGuess to the database")
return race_guess
def update_race_guess(race_name: str, user_name: str, pxx_select: str | None, dnf_select: str | None) -> Response:
if any_is_none(pxx_select, dnf_select):
return redirect(f"/race/{quote(user_name)}")
pxx_driver_name: str = cast(str, pxx_select)
dnf_driver_name: str = cast(str, dnf_select)
# TODO: Date-lock this. Otherwise there is a period of time after the race
# but before the result where guesses can still be entered
# We can't guess for races that are already over
if race_has_result(race_name):
return redirect(f"/race/{quote(user_name)}")
race_guess: RaceGuess = find_or_create_race_guess(user_name, race_name)
race_guess.pxx_driver_name = pxx_driver_name
race_guess.dnf_driver_name = dnf_driver_name
db.session.commit()
return redirect("/race/Everyone")
def find_or_create_team_winners(user_name: str) -> TeamWinners:
# There can be a single TeamWinners at most, since user_name is the primary key
team_winners: TeamWinners | None = db.session.query(TeamWinners).filter_by(user_name=user_name).first()
if team_winners is not None:
return team_winners
team_winners = TeamWinners(user_name=user_name)
db.session.add(team_winners)
db.session.commit()
# Double check if database insertion worked and obtain any values set by the database
team_winners = db.session.query(TeamWinners).filter_by(user_name=user_name).first()
if team_winners is None:
raise Exception("Failed adding TeamWinners to the database")
return team_winners
def find_or_create_podium_drivers(user_name: str) -> PodiumDrivers:
# There can be a single PodiumDrivers at most, since user_name is the primary key
podium_drivers: PodiumDrivers | None = db.session.query(PodiumDrivers).filter_by(user_name=user_name).first()
if podium_drivers is not None:
return podium_drivers
podium_drivers = PodiumDrivers(user_name=user_name)
db.session.add(podium_drivers)
db.session.commit()
# Double check if database insertion worked and obtain any values set by the database
podium_drivers = db.session.query(PodiumDrivers).filter_by(user_name=user_name).first()
if podium_drivers is None:
raise Exception("Failed adding PodiumDrivers to the database")
return podium_drivers
def find_or_create_season_guess(user_name: str) -> SeasonGuess:
# There can be a single SeasonGuess at most, since user_name is the primary key
season_guess: SeasonGuess | None = db.session.query(SeasonGuess).filter_by(user_name=user_name).first()
if season_guess is not None:
# There can't be more than a single one, since both also use user_name as primary key
if db.session.query(TeamWinners).filter_by(user_name=user_name).first() is None:
raise Exception(f"SeasonGuess for {user_name} is missing associated TeamWinners")
if db.session.query(PodiumDrivers).filter_by(user_name=user_name).first() is None:
raise Exception(f"SeasonGuess for {user_name} is missing associated PodiumDrivers")
return season_guess
# Insert a new SeasonGuess
team_winners: TeamWinners = find_or_create_team_winners(user_name)
podium_drivers: PodiumDrivers = find_or_create_podium_drivers(user_name)
season_guess = SeasonGuess(user_name=user_name, team_winners_user_name=team_winners.user_name, podium_drivers_user_name=podium_drivers.user_name)
db.session.add(season_guess)
db.session.commit()
# Double check if database insertion worked and obtain any values set by the database
season_guess = db.session.query(SeasonGuess).filter_by(user_name=user_name).first()
if season_guess is None:
raise Exception("Failed adding SeasonGuess to the database")
return season_guess
def update_season_guess(user_name: str, guesses: List[str | None] | List[str], team_winner_guesses: List[str | None] | List[str], podium_driver_guesses: List[str]) -> Response:
if any_is_none(*guesses) or any_is_none(*team_winner_guesses):
return redirect(f"/season/{quote(user_name)}")
guesses = cast(List[str], guesses)
team_winner_guesses = cast(List[str], team_winner_guesses)
season_guess: SeasonGuess = find_or_create_season_guess(user_name)
season_guess.hot_take = guesses[0]
season_guess.p2_team_name = guesses[1]
season_guess.overtake_driver_name = guesses[2]
season_guess.dnf_driver_name = guesses[3]
season_guess.gained_driver_name = guesses[4]
season_guess.lost_driver_name = guesses[5]
season_guess.team_winners.teamwinner_driver_names = team_winner_guesses # TODO: Or commit seperately?
season_guess.podium_drivers.podium_driver_names = podium_driver_guesses # TODO: Or commit separately?
db.session.commit()
return redirect(f"/season/{quote(user_name)}")
def find_or_create_race_result(race_name: str) -> RaceResult:
# There can be a single RaceResult at most, since race_name is the primary key
race_result: RaceResult | None = db.session.query(RaceResult).filter_by(race_name=race_name).first()
if race_result is not None:
return race_result
race_result = RaceResult(race_name=race_name)
db.session.add(race_result)
db.session.commit()
# Double check if database insertion worked and obtain any values set by the database
race_result = db.session.query(RaceResult).filter_by(race_name=race_name).first()
if race_result is None:
raise Exception("Failed adding RaceResult to the database")
return race_result
def update_race_result(race_name: str, pxx_driver_names_list: List[str], dnf_driver_names_list: List[str], excluded_driver_names: List[str]) -> Response:
# Use strings as keys, as these dicts will be serialized to json
pxx_driver_names: Dict[str, str] = {str(position + 1): driver for position, driver in enumerate(pxx_driver_names_list)}
dnf_driver_names: Dict[str, str] = {str(position + 1): driver for position, driver in enumerate(pxx_driver_names_list) if driver in dnf_driver_names_list}
# TODO: This validation is incomplete
if not positions_are_contiguous(list(dnf_driver_names.keys())):
return redirect(f"/result/{quote(race_name)}")
race_result: RaceResult = find_or_create_race_result(race_name)
race_result.pxx_driver_names = pxx_driver_names
race_result.dnf_driver_names = dnf_driver_names if len(dnf_driver_names_list) > 0 else {"20": "NONE"}
race_result.excluded_driver_names = excluded_driver_names
db.session.commit()
return redirect(f"/result/{quote(race_name)}")
def update_user(user_name: str | None, add: bool = False, delete: bool = False) -> Response:
if user_name is None or len(user_name) < 3:
return redirect("/user")
if not add and not delete:
return redirect("/user")
if add and delete:
return redirect("/user")
if add:
if user_exists(user_name):
return redirect("/user")
user: User = User(name=user_name)
db.session.add(user)
db.session.commit()
return redirect("/user")
if delete:
if not user_exists(user_name):
return redirect("/user")
db.session.query(User).filter_by(name=user_name).delete()
db.session.commit()
return redirect("/user")
raise Exception("update_user received illegal combination of arguments")

View File

@ -1,98 +1,9 @@
import csv
import os.path
from typing import List, Any
from flask_sqlalchemy import SQLAlchemy
from model import Team, Driver, Race, User, RaceResult, RaceGuess, TeamWinners, PodiumDrivers, SeasonGuess
from model import User, db, RaceResult
def load_csv(filename: str) -> List[List[str]]:
if not os.path.exists(filename):
print(f"Could not load data from file {filename}, as it doesn't exist!")
return []
with open(filename, "r", newline="") as file:
reader = csv.reader(file, delimiter=",")
next(reader, None) # skip header
return list(reader)
def race_has_result(race_name: str) -> bool:
return db.session.query(RaceResult).filter_by(race_name=race_name).first() is not None
def write_csv(filename: str, objects: List[Any]):
if len(objects) == 0:
print(f"Could not write objects to file {filename}, as no objects were given!")
return
with open(filename, "w", newline="") as file:
writer = csv.writer(file, delimiter=",")
writer.writerow(objects[0].__csv_header__)
for obj in objects:
writer.writerow(obj.to_csv())
# Reload static database data, this has to be called from the app context
def reload_static_data(db: SQLAlchemy):
print("Initializing Database with Static Values...")
# Create it (if it doesn't exist!)
db.create_all()
# Clear static data
db.session.query(Team).delete()
db.session.query(Driver).delete()
db.session.query(Race).delete()
# Reload static data
for row in load_csv("static_data/teams.csv"):
db.session.add(Team().from_csv(row))
for row in load_csv("static_data/drivers.csv"):
db.session.add(Driver().from_csv(row))
for row in load_csv("static_data/races.csv"):
db.session.add(Race().from_csv(row))
db.session.commit()
def reload_dynamic_data(db: SQLAlchemy):
print("Initializing Database with Dynamic Values...")
# Create it (if it doesn't exist!)
db.create_all()
# Clear dynamic data
db.session.query(User).delete()
db.session.query(RaceResult).delete()
db.session.query(RaceGuess).delete()
db.session.query(TeamWinners).delete()
db.session.query(PodiumDrivers).delete()
db.session.query(SeasonGuess).delete()
# Reload dynamic data
for row in load_csv("dynamic_data/users.csv"):
db.session.add(User().from_csv(row))
for row in load_csv("dynamic_data/raceresults.csv"):
db.session.add(RaceResult().from_csv(row))
for row in load_csv("dynamic_data/raceguesses.csv"):
db.session.add(RaceGuess().from_csv(row))
for row in load_csv("dynamic_data/teamwinners.csv"):
db.session.add(TeamWinners().from_csv(row))
for row in load_csv("dynamic_data/podiumdrivers.csv"):
db.session.add(PodiumDrivers().from_csv(row))
for row in load_csv("dynamic_data/seasonguesses.csv"):
db.session.add(SeasonGuess().from_csv(row))
db.session.commit()
def export_dynamic_data():
print("Exporting Userdata...")
users: List[User] = User.query.all()
raceresults: List[RaceResult] = RaceResult.query.all()
raceguesses: List[RaceGuess] = RaceGuess.query.all()
teamwinners: List[TeamWinners] = TeamWinners.query.all()
podiumdrivers: List[PodiumDrivers] = PodiumDrivers.query.all()
seasonguesses: List[SeasonGuess] = SeasonGuess.query.all()
write_csv("dynamic_data/users.csv", users)
write_csv("dynamic_data/raceresults.csv", raceresults)
write_csv("dynamic_data/raceguesses.csv", raceguesses)
write_csv("dynamic_data/teamwinners.csv", teamwinners)
write_csv("dynamic_data/podiumdrivers.csv", podiumdrivers)
write_csv("dynamic_data/seasonguesses.csv", seasonguesses)
def user_exists(user_name: str) -> bool:
return db.session.query(User).filter_by(name=user_name).first() is not None

97
file_utils.py Normal file
View File

@ -0,0 +1,97 @@
import csv
import os.path
from typing import List, Any
from model import Team, Driver, Race, User, RaceResult, RaceGuess, TeamWinners, PodiumDrivers, SeasonGuess, db
def load_csv(filename: str) -> List[List[str]]:
if not os.path.exists(filename):
print(f"Could not load data from file {filename}, as it doesn't exist!")
return []
with open(filename, "r", newline="") as file:
reader = csv.reader(file, delimiter=",")
next(reader, None) # skip header
return list(reader)
def write_csv(filename: str, objects: List[Any]):
if len(objects) == 0:
print(f"Could not write objects to file {filename}, as no objects were given!")
return
with open(filename, "w", newline="") as file:
writer = csv.writer(file, delimiter=",")
writer.writerow(objects[0].__csv_header__)
for obj in objects:
writer.writerow(obj.to_csv())
# Reload static database data, this has to be called from the app context
def reload_static_data():
print("Initializing Database with Static Values...")
# Create it (if it doesn't exist!)
db.create_all()
# Clear static data
db.session.query(Team).delete()
db.session.query(Driver).delete()
db.session.query(Race).delete()
# Reload static data
for row in load_csv("static_data/teams.csv"):
db.session.add(Team.from_csv(row))
for row in load_csv("static_data/drivers.csv"):
db.session.add(Driver.from_csv(row))
for row in load_csv("static_data/races.csv"):
db.session.add(Race.from_csv(row))
db.session.commit()
def reload_dynamic_data():
print("Initializing Database with Dynamic Values...")
# Create it (if it doesn't exist!)
db.create_all()
# Clear dynamic data
db.session.query(User).delete()
db.session.query(RaceResult).delete()
db.session.query(RaceGuess).delete()
db.session.query(TeamWinners).delete()
db.session.query(PodiumDrivers).delete()
db.session.query(SeasonGuess).delete()
# Reload dynamic data
for row in load_csv("dynamic_data/users.csv"):
db.session.add(User.from_csv(row))
for row in load_csv("dynamic_data/raceresults.csv"):
db.session.add(RaceResult.from_csv(row))
for row in load_csv("dynamic_data/raceguesses.csv"):
db.session.add(RaceGuess.from_csv(row))
for row in load_csv("dynamic_data/teamwinners.csv"):
db.session.add(TeamWinners.from_csv(row))
for row in load_csv("dynamic_data/podiumdrivers.csv"):
db.session.add(PodiumDrivers.from_csv(row))
for row in load_csv("dynamic_data/seasonguesses.csv"):
db.session.add(SeasonGuess.from_csv(row))
db.session.commit()
def export_dynamic_data():
print("Exporting Userdata...")
users: List[User] = db.session.query(User).all()
raceresults: List[RaceResult] = db.session.query(RaceResult).all()
raceguesses: List[RaceGuess] = db.session.query(RaceGuess).all()
teamwinners: List[TeamWinners] = db.session.query(TeamWinners).all()
podiumdrivers: List[PodiumDrivers] = db.session.query(PodiumDrivers).all()
seasonguesses: List[SeasonGuess] = db.session.query(SeasonGuess).all()
write_csv("dynamic_data/users.csv", users)
write_csv("dynamic_data/raceresults.csv", raceresults)
write_csv("dynamic_data/raceguesses.csv", raceguesses)
write_csv("dynamic_data/teamwinners.csv", teamwinners)
write_csv("dynamic_data/podiumdrivers.csv", podiumdrivers)
write_csv("dynamic_data/seasonguesses.csv", seasonguesses)

View File

@ -1,14 +1,17 @@
from typing import List
from urllib.parse import unquote
from flask import Flask, render_template, request, redirect
from werkzeug import Response
from model import *
from database_utils import reload_static_data, reload_dynamic_data, export_dynamic_data
from model import Team, db
from file_utils import reload_static_data, reload_dynamic_data, export_dynamic_data
from template_model import TemplateModel
from backend_model import update_race_guess, update_race_result, update_season_guess, update_user
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = "sqlite:///formula10.db"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.url_map.strict_slashes = False
db.init_app(app)
@ -17,13 +20,16 @@ db.init_app(app)
# General
# - A lot of validation (esp. in the model), each input should be checked (e.g. DNF, excluded order)...
# - NONE driver handling: If PXX = NONE is selected, excluded drivers have to be taken into account
# - Show cards of previous race results, like with season guesses?
# - Make the season card grid left-aligned? So e.g. 2 cards are not spread over the whole screen with large gaps?
# - Choose "place to guess" late before the race?
# Statistics page
# Statistics
# - Auto calculate points
# - Order user table by points + display points somewhere
# - Show current values for some season guesses (e.g. current most dnfs)
# - Generate static diagram using chart.js + templating the js (funny yikes)
# Rules page
@ -42,20 +48,20 @@ def save() -> Response:
@app.route("/load/all")
def load() -> Response:
reload_static_data(db)
reload_dynamic_data(db)
reload_static_data()
reload_dynamic_data()
return redirect("/")
@app.route("/load/static")
def load_static() -> Response:
reload_static_data(db)
reload_static_data()
return redirect("/")
@app.route("/load/dynamic")
def load_dynamic() -> Response:
reload_dynamic_data(db)
reload_dynamic_data()
return redirect("/")
@ -81,26 +87,7 @@ def race_guess_post(race_name: str, user_name: str) -> Response:
pxx: str | None = request.form.get("pxxselect")
dnf: str | None = request.form.get("dnfselect")
if pxx is None or dnf is None:
return redirect(f"/race/{quote(user_name)}")
if RaceResult.query.filter_by(race_name=race_name).first() is not None:
print("Error: Can't guess race result if the race result is already known!")
return redirect(f"/race/{quote(user_name)}")
raceguess: RaceGuess | None = db.session.query(RaceGuess).filter_by(user_name=user_name, race_name=race_name).first()
if raceguess is None:
raceguess = RaceGuess()
raceguess.user_name = user_name
raceguess.race_name = race_name
db.session.add(raceguess)
raceguess.pxx_driver_name = pxx
raceguess.dnf_driver_name = dnf
db.session.commit()
return redirect("/race/Everyone")
return update_race_guess(race_name, user_name, pxx, dnf)
@app.route("/season")
@ -128,51 +115,12 @@ def season_guess_post(user_name: str) -> Response:
request.form.get("gainedselect"),
request.form.get("lostselect")
]
teamwinnerguesses: List[str | None] = [
team_winner_guesses: List[str | None] = [
request.form.get(f"teamwinner-{team.name}") for team in db.session.query(Team).all()
]
podiumdriverguesses: List[str] = request.form.getlist("podiumdrivers")
podium_driver_guesses: List[str] = request.form.getlist("podiumdrivers")
if any(guess is None for guess in guesses + teamwinnerguesses):
print("Error: /guessseason could not obtain request data!")
return redirect(f"/season/{quote(user_name)}")
seasonguess: SeasonGuess | None = db.session.query(SeasonGuess).filter_by(user_name=user_name).first()
teamwinners: TeamWinners | None = seasonguess.team_winners if seasonguess is not None else None
podiumdrivers: PodiumDrivers | None = seasonguess.podium_drivers if seasonguess is not None else None
if teamwinners is None:
teamwinners = TeamWinners()
db.session.add(teamwinners)
teamwinners.user_name = user_name
teamwinners.teamwinner_driver_names = teamwinnerguesses # Pylance throws error, but nullcheck is done
db.session.commit()
if podiumdrivers is None:
podiumdrivers = PodiumDrivers()
db.session.add(podiumdrivers)
podiumdrivers.podium_driver_names = podiumdriverguesses
podiumdrivers.user_name = user_name
db.session.commit()
if seasonguess is None:
seasonguess = SeasonGuess()
seasonguess.user_name = user_name
db.session.add(seasonguess)
seasonguess.hot_take = guesses[0] # Pylance throws error but nullcheck is done
seasonguess.p2_team_name = guesses[1] # Pylance throws error but nullcheck is done
seasonguess.overtake_driver_name = guesses[2] # Pylance throws error but nullcheck is done
seasonguess.dnf_driver_name = guesses[3] # Pylance throws error but nullcheck is done
seasonguess.gained_driver_name = guesses[4] # Pylance throws error but nullcheck is done
seasonguess.lost_driver_name = guesses[5] # Pylance throws error but nullcheck is done
seasonguess.team_winners_id = user_name
seasonguess.podium_drivers_id = user_name
db.session.commit()
return redirect(f"/season/{quote(user_name)}")
return update_season_guess(user_name, guesses, team_winner_guesses, podium_driver_guesses)
@app.route("/result")
@ -189,84 +137,34 @@ def result_active_race(race_name: str) -> str:
model=model)
@app.route("/result-enter/<result_race_name>", methods=["POST"])
def result_enter_post(result_race_name: str) -> Response:
result_race_name = unquote(result_race_name)
@app.route("/result-enter/<race_name>", methods=["POST"])
def result_enter_post(race_name: str) -> Response:
race_name = unquote(race_name)
pxxs: List[str] = request.form.getlist("pxxdrivers")
dnfs: List[str] = request.form.getlist("dnf-drivers")
excludes: List[str] = request.form.getlist("exclude-drivers")
# Use strings as keys, as these dicts will be serialized to json
pxxs_dict: Dict[str, str] = {str(position + 1): driver for position, driver in enumerate(pxxs)}
dnfs_dict: Dict[str, str] = {str(position + 1): driver for position, driver in enumerate(pxxs) if driver in dnfs}
raceresult: RaceResult | None = db.session.query(RaceResult).filter_by(race_name=result_race_name).first()
if raceresult is None:
raceresult = RaceResult()
db.session.add(raceresult)
raceresult.race_name = result_race_name
raceresult.pxx_driver_names = pxxs_dict
raceresult.dnf_driver_names = dnfs_dict if len(dnfs) > 0 else {"20": "NONE"}
raceresult.excluded_driver_names = excludes
db.session.commit()
race: Race | None = db.session.query(Race).filter_by(name=result_race_name).first()
if race is None:
print("Error: Can't redirect to /enter/<GrandPrix> because race couldn't be found")
return redirect(f"/result/Current")
return redirect(f"/result/{quote(race.name)}")
return update_race_result(race_name, pxxs, dnfs, excludes)
@app.route("/user")
def user_root() -> str:
users: List[User] = User.query.all()
model = TemplateModel()
return render_template("users.jinja",
users=users)
model=model)
@app.route("/user-add", methods=["POST"])
def user_add_post() -> Response:
username: str | None = request.form.get("select-add-user")
if username is None or len(username) == 0:
print(f"Not adding user, since no username was received")
return redirect("/user")
if len(db.session.query(User).filter_by(name=username).all()) > 0:
print(f"Not adding user {username}: Already exists!")
return redirect("/user")
user = User()
user.name = username
db.session.add(user)
db.session.commit()
return redirect("/user")
return update_user(username, add=True)
@app.route("/user-delete", methods=["POST"])
def user_delete_post() -> Response:
username = request.form.get("select-delete-user")
if username is None or len(username) == 0:
print(f"Not deleting user, since no username was received")
return redirect("/user")
if username == "Select User":
return redirect("/user")
print(f"Deleting user {username}...")
User.query.filter_by(name=username).delete()
db.session.commit()
return redirect("/user")
username: str | None = request.form.get("select-delete-user")
return update_user(username, delete=True)
if __name__ == "__main__":
app.url_map.strict_slashes = False
app.run(debug=True, host="0.0.0.0")

165
model.py
View File

@ -20,12 +20,14 @@ class Race(db.Model):
"""
__tablename__ = "race"
def from_csv(self, row: List[str]):
self.name = str(row[0])
self.number = int(row[1])
self.date = datetime.strptime(row[2], "%Y-%m-%d")
self.pxx = int(row[3])
return self
@staticmethod
def from_csv(row: List[str]):
race: Race = Race()
race.name = str(row[0])
race.number = int(row[1])
race.date = datetime.strptime(row[2], "%Y-%m-%d")
race.pxx = int(row[3])
return race
@property
def name_sanitized(self) -> str:
@ -43,9 +45,11 @@ class Team(db.Model):
"""
__tablename__ = "team"
def from_csv(self, row: List[str]):
self.name = str(row[0])
return self
@staticmethod
def from_csv(row: List[str]):
team: Team = Team()
team.name = str(row[0])
return team
name: Mapped[str] = mapped_column(String(32), primary_key=True)
@ -57,12 +61,14 @@ class Driver(db.Model):
"""
__tablename__ = "driver"
def from_csv(self, row: List[str]):
self.name = str(row[0])
self.abbr = str(row[1])
self.team_name = str(row[2])
self.country_code = str(row[3])
return self
@staticmethod
def from_csv(row: List[str]):
driver: Driver = Driver()
driver.name = str(row[0])
driver.abbr = str(row[1])
driver.team_name = str(row[2])
driver.country_code = str(row[3])
return driver
name: Mapped[str] = mapped_column(String(32), primary_key=True)
abbr: Mapped[str] = mapped_column(String(3))
@ -85,9 +91,13 @@ class User(db.Model):
__tablename__ = "user"
__csv_header__ = ["name"]
def from_csv(self, row: List[str]):
self.name = str(row[0])
return self
def __init__(self, name: str):
self.name = name # Primary key
@staticmethod
def from_csv(row: List[str]):
user: User = User(str(row[0]))
return user
def to_csv(self) -> List[Any]:
return [
@ -110,12 +120,16 @@ class RaceResult(db.Model):
__allow_unmapped__ = True # TODO: Used for json conversion, move this to some other class instead
__csv_header__ = ["race_name", "pxx_driver_names_json", "dnf_driver_names_json", "excluded_driver_names_json"]
def from_csv(self, row: List[str]):
self.race_name = str(row[0])
self.pxx_driver_names_json = str(row[1])
self.dnf_driver_names_json = str(row[2])
self.excluded_driver_names_json = str(row[3])
return self
def __init__(self, race_name: str):
self.race_name = race_name # Primary key
@staticmethod
def from_csv(row: List[str]):
race_result: RaceResult = RaceResult(str(row[0]))
race_result.pxx_driver_names_json = str(row[1])
race_result.dnf_driver_names_json = str(row[2])
race_result.excluded_driver_names_json = str(row[3])
return race_result
def to_csv(self) -> List[Any]:
return [
@ -126,9 +140,9 @@ class RaceResult(db.Model):
]
race_name: Mapped[str] = mapped_column(ForeignKey("race.name"), primary_key=True)
pxx_driver_names_json: Mapped[str] = mapped_column(String(1024))
dnf_driver_names_json: Mapped[str] = mapped_column(String(1024))
excluded_driver_names_json: Mapped[str] = mapped_column(String(1024))
pxx_driver_names_json: Mapped[str] = mapped_column(String(1024), nullable=True)
dnf_driver_names_json: Mapped[str] = mapped_column(String(1024), nullable=True)
excluded_driver_names_json: Mapped[str] = mapped_column(String(1024), nullable=True)
@property
def pxx_driver_names(self) -> Dict[str, str]:
@ -229,12 +243,16 @@ class RaceGuess(db.Model):
__tablename__ = "raceguess"
__csv_header__ = ["user_name", "race_name", "pxx_driver_name", "dnf_driver_name"]
def from_csv(self, row: List[str]):
self.user_name = str(row[0])
self.race_name = str(row[1])
self.pxx_driver_name = str(row[2])
self.dnf_driver_name = str(row[3])
return self
def __init__(self, user_name: str, race_name: str):
self.user_name = user_name # Primary key
self.race_name = race_name # Primery key
@staticmethod
def from_csv(row: List[str]):
race_guess: RaceGuess = RaceGuess(str(row[0]), str(row[1]))
race_guess.pxx_driver_name = str(row[2])
race_guess.dnf_driver_name = str(row[3])
return race_guess
def to_csv(self) -> List[Any]:
return [
@ -246,8 +264,8 @@ class RaceGuess(db.Model):
user_name: Mapped[str] = mapped_column(ForeignKey("user.name"), primary_key=True)
race_name: Mapped[str] = mapped_column(ForeignKey("race.name"), primary_key=True)
pxx_driver_name: Mapped[str] = mapped_column(ForeignKey("driver.name"))
dnf_driver_name: Mapped[str] = mapped_column(ForeignKey("driver.name"))
pxx_driver_name: Mapped[str] = mapped_column(ForeignKey("driver.name"), nullable=True)
dnf_driver_name: Mapped[str] = mapped_column(ForeignKey("driver.name"), nullable=True)
# Relationships
user: Mapped["User"] = relationship("User", foreign_keys=[user_name])
@ -264,10 +282,14 @@ class TeamWinners(db.Model):
__allow_unmapped__ = True
__csv_header__ = ["user_name", "teamwinner_driver_names_json"]
def from_csv(self, row: List[str]):
self.user_name = str(row[0])
self.teamwinner_driver_names_json = str(row[1])
return self
def __init__(self, user_name: str):
self.user_name = user_name # Primary key
@staticmethod
def from_csv(row: List[str]):
team_winners: TeamWinners = TeamWinners(str(row[0]))
team_winners.teamwinner_driver_names_json = str(row[1])
return team_winners
def to_csv(self) -> List[Any]:
return [
@ -276,7 +298,7 @@ class TeamWinners(db.Model):
]
user_name: Mapped[str] = mapped_column(ForeignKey("user.name"), primary_key=True)
teamwinner_driver_names_json: Mapped[str] = mapped_column(String(1024))
teamwinner_driver_names_json: Mapped[str] = mapped_column(String(1024), nullable=True)
@property
def teamwinner_driver_names(self) -> List[str]:
@ -312,10 +334,14 @@ class PodiumDrivers(db.Model):
__allow_unmapped__ = True
__csv_header__ = ["user_name", "podium_driver_names_json"]
def from_csv(self, row: List[str]):
self.user_name = str(row[0])
self.podium_driver_names_json = str(row[1])
return self
def __init__(self, user_name: str):
self.user_name = user_name
@staticmethod
def from_csv(row: List[str]):
podium_drivers: PodiumDrivers = PodiumDrivers(str(row[0]))
podium_drivers.podium_driver_names_json = str(row[1])
return podium_drivers
def to_csv(self) -> List[Any]:
return [
@ -324,7 +350,7 @@ class PodiumDrivers(db.Model):
]
user_name: Mapped[str] = mapped_column(ForeignKey("user.name"), primary_key=True)
podium_driver_names_json: Mapped[str] = mapped_column(String(1024))
podium_driver_names_json: Mapped[str] = mapped_column(String(1024), nullable=True)
@property
def podium_driver_names(self) -> List[str]:
@ -361,17 +387,32 @@ class SeasonGuess(db.Model):
"overtake_driver_name", "dnf_driver_name", "gained_driver_name", "lost_driver_name",
"team_winners_id", "podium_drivers_id"]
def from_csv(self, row: List[str]):
self.user_name = str(row[0]) # Also used as foreign key for teamwinners + podiumdrivers
self.hot_take = str(row[1])
self.p2_team_name = str(row[2])
self.overtake_driver_name = str(row[3])
self.dnf_driver_name = str(row[4])
self.gained_driver_name = str(row[5])
self.lost_driver_name = str(row[6])
self.team_winners_id = str(row[7])
self.podium_drivers_id = str(row[8])
return self
def __init__(self, user_name: str, team_winners_user_name: str | None = None, podium_drivers_user_name: str | None = None):
self.user_name = user_name # Primary key
# Although this is the same username, handle separately, in case they don't exist in the database yet
if team_winners_user_name is not None:
if user_name != team_winners_user_name:
raise Exception(f"SeasonGuess for {user_name} was supplied TeamWinners for {team_winners_user_name}")
self.team_winners_id = team_winners_user_name
if podium_drivers_user_name is not None:
if user_name != podium_drivers_user_name:
raise Exception(f"SeasonGuess for {user_name} was supplied PodiumDrivers for {podium_drivers_user_name}")
self.podium_drivers_id = podium_drivers_user_name
@staticmethod
def from_csv(row: List[str]):
season_guess: SeasonGuess = SeasonGuess(str(row[0]), team_winners_user_name=str(row[7]), podium_drivers_user_name=str(row[8]))
season_guess.hot_take = str(row[1])
season_guess.p2_team_name = str(row[2])
season_guess.overtake_driver_name = str(row[3])
season_guess.dnf_driver_name = str(row[4])
season_guess.gained_driver_name = str(row[5])
season_guess.lost_driver_name = str(row[6])
return season_guess
def to_csv(self) -> List[Any]:
return [
@ -387,12 +428,12 @@ class SeasonGuess(db.Model):
]
user_name: Mapped[str] = mapped_column(ForeignKey("user.name"), primary_key=True)
hot_take: Mapped[str] = mapped_column(String(512))
p2_team_name: Mapped[str] = mapped_column(ForeignKey("team.name"))
overtake_driver_name: Mapped[str] = mapped_column(ForeignKey("driver.name"))
dnf_driver_name: Mapped[str] = mapped_column(ForeignKey("driver.name"))
gained_driver_name: Mapped[str] = mapped_column(ForeignKey("driver.name"))
lost_driver_name: Mapped[str] = mapped_column(ForeignKey("driver.name"))
hot_take: Mapped[str] = mapped_column(String(512), nullable=True)
p2_team_name: Mapped[str] = mapped_column(ForeignKey("team.name"), nullable=True)
overtake_driver_name: Mapped[str] = mapped_column(ForeignKey("driver.name"), nullable=True)
dnf_driver_name: Mapped[str] = mapped_column(ForeignKey("driver.name"), nullable=True)
gained_driver_name: Mapped[str] = mapped_column(ForeignKey("driver.name"), nullable=True)
lost_driver_name: Mapped[str] = mapped_column(ForeignKey("driver.name"), nullable=True)
team_winners_id: Mapped[str] = mapped_column(ForeignKey("teamwinners.user_name"))
podium_drivers_id: Mapped[str] = mapped_column(ForeignKey("podiumdrivers.user_name"))

View File

@ -1,58 +1,10 @@
from typing import List, Iterable, Callable, TypeVar, Dict, overload
from typing import List, Callable, Dict, overload
from sqlalchemy import desc
from model import User, RaceResult, RaceGuess, Race, Driver, Team, SeasonGuess, db
_T = TypeVar("_T")
from validation_utils import find_first_or_none, find_multiple, find_single, find_single_or_none
def find_first_or_none(predicate: Callable[[_T], bool], iterable: Iterable[_T]) -> _T | None:
"""
Finds the first element in a sequence matching a predicate.
Returns None if no element is found.
"""
return next(filter(predicate, iterable), None)
def find_multiple(predicate: Callable[[_T], bool], iterable: Iterable[_T], count: int = 0) -> List[_T]:
"""
Finds <count> elements in a sequence matching a predicate (finds all if <count> is 0).
Throws exception if more/fewer elements were found than specified.
"""
filtered = list(filter(predicate, iterable))
if count != 0 and len(filtered) != count:
raise Exception(f"find_multiple found {len(filtered)} matching elements but expected {count}")
return filtered
def find_single(predicate: Callable[[_T], bool], iterable: Iterable[_T]) -> _T:
"""
Find a single element in a sequence matching a predicate.
Throws exception if more/less than a single element is found.
"""
filtered = list(filter(predicate, iterable))
if len(filtered) != 1:
raise Exception(f"find_single found {len(filtered)} matching elements but expected 1")
return filtered[0]
def find_single_or_none(predicate: Callable[[_T], bool], iterable: Iterable[_T]) -> _T | None:
"""
Find a single element in a sequence matching a predicate if it exists.
Only throws exception if more than a single element is found.
"""
filtered = list(filter(predicate, iterable))
if len(filtered) > 1:
raise Exception(f"find_single_or_none found {len(list(filtered))} matching elements but expected 0 or 1")
return filtered[0] if len(filtered) == 1 else None
# This is inefficient (doesn't matter for small database), but very simple
# This could also be moved to database_utils (at least partially), but I though the template should cache the database responses
class TemplateModel:
"""
This class bundles all data required from inside a template.

View File

@ -141,6 +141,12 @@ P{{ result.race.pxx + 3 }}: {{ result.pxx(3).abbr }}
<link href="../static/style/bootstrap.css" rel="stylesheet">
<script src="../static/script/bootstrap.bundle.js"></script>
<script defer>
{# Initialize Bootstrap Tooltips #}
let tooltipTriggerList = document.querySelectorAll("[data-bs-toggle='tooltip']")
let tooltipList = [...tooltipTriggerList].map(tooltipTriggerEl => new bootstrap.Tooltip(tooltipTriggerEl))
</script>
{% block head_extra %}{% endblock head_extra %}
</head>

View File

@ -100,7 +100,7 @@
id="exclude-{{ driver.name }}" name="exclude-drivers"
{% if (active_result is not none) and (driver in active_result.excluded_drivers) %}checked{% endif %}>
<label for="exclude-{{ driver.name }}"
class="form-check-label text-muted">Exclude</label>
class="form-check-label text-muted" data-bs-toggle="tooltip" title="Exclude driver from points calculation">Exclude</label>
</div>
</div>

View File

@ -4,14 +4,6 @@
{% set active_page = "/race/" ~ (active_user.name_sanitized if active_user is not none else "Everyone") %}
{% block head_extra %}
<script defer>
{# Initialize Bootstrap Tooltips #}
let tooltipTriggerList = document.querySelectorAll("[data-bs-toggle='tooltip']")
let tooltipList = [...tooltipTriggerList].map(tooltipTriggerEl => new bootstrap.Tooltip(tooltipTriggerEl))
</script>
{% endblock head_extra %}
{% block navbar_center %}
{% if model.all_users() | length > 1 %}
<div class="dropdown">

View File

@ -90,7 +90,7 @@
</div>
{# Team-internal Winners #}
<h6 class="card-subtitle mt-2">Teammate battle winners:</h6>
<h6 class="card-subtitle mt-2" data-bs-toggle="tooltip" title="Which driver will finish the season higher than his teammate?">Teammate battle winners:</h6>
<div class="grid mt-2" style="width: 450px; row-gap: 0;">
{% for team in model.all_teams() %}
{% set driver_a_name = model.drivers_by(team_name=team.name)[0].name %}
@ -123,7 +123,7 @@
</div>
{# Drivers with Podiums #}
<h6 class="card-subtitle mt-2">Drivers with podium(s):</h6>
<h6 class="card-subtitle mt-2" data-bs-toggle="tooltip" title="Which driver will reach at least a single podium?">Drivers with podium(s):</h6>
<div class="grid mt-2" style="width: 450px; row-gap: 0;">
{% for team in model.all_teams() %}
{% set driver_a_name = model.drivers_by(team_name=team.name)[0].name %}

View File

@ -25,13 +25,13 @@
</div>
</div>
{% if users | length > 0 %}
{% if model.all_users() | length > 0 %}
<div class="card mt-2 shadow-sm">
<div class="card-body">
<h5 class="card-title">Registered Users</h5>
<ul class="list-group list-group-flush">
{% for user in users %}
{% for user in model.all_users() %}
<li class="list-group-item">{{ user.name }}</li>
{% endfor %}
</ul>
@ -47,7 +47,7 @@
<select class="form-control form-select" aria-label="select-delete-user"
name="select-delete-user">
<option selected="selected" disabled="disabled" hidden="hidden">Select User</option>
{% for user in users %}
{% for user in model.all_users() %}
<option value="{{ user.name }}">{{ user.name }}</option>
{% endfor %}
</select>
@ -71,7 +71,7 @@
<div class="card mt-2 border-danger shadow-sm">
<div class="card-body">
<h5 class="card-title">Functions that should not be public</h5>
<h6 class="card-subtitle mb-2">(Fuck you if you click this without knowing what it does)</h6>
<h6 class="card-subtitle mb-2">(F you if you click this without knowing what it does)</h6>
<a class="btn btn-outline-danger" href="/save/all">Save all data</a>
<a class="btn btn-outline-danger" href="/load/all">Load all data</a>

66
validation_utils.py Normal file
View File

@ -0,0 +1,66 @@
from typing import Any, Callable, Iterable, List, TypeVar
_T = TypeVar("_T")
def any_is_none(*args: Any) -> bool:
for arg in args:
if arg is None:
return True
return False
def positions_are_contiguous(positions: List[str]) -> bool:
positions_unique = set(positions) # Remove duplicates
positions_sorted: List[int] = sorted([int(position) for position in positions_unique])
# [2, 3, 4, 5]: 2 + 3 == 5
return positions_sorted[0] + len(positions_sorted) - 1 == positions_sorted[-1]
def find_first_or_none(predicate: Callable[[_T], bool], iterable: Iterable[_T]) -> _T | None:
"""
Finds the first element in a sequence matching a predicate.
Returns None if no element is found.
"""
return next(filter(predicate, iterable), None)
def find_multiple(predicate: Callable[[_T], bool], iterable: Iterable[_T], count: int = 0) -> List[_T]:
"""
Finds <count> elements in a sequence matching a predicate (finds all if <count> is 0).
Throws exception if more/fewer elements were found than specified.
"""
filtered = list(filter(predicate, iterable))
if count != 0 and len(filtered) != count:
raise Exception(f"find_multiple found {len(filtered)} matching elements but expected {count}")
return filtered
def find_single(predicate: Callable[[_T], bool], iterable: Iterable[_T]) -> _T:
"""
Find a single element in a sequence matching a predicate.
Throws exception if more/less than a single element is found.
"""
filtered = list(filter(predicate, iterable))
if len(filtered) != 1:
raise Exception(f"find_single found {len(filtered)} matching elements but expected 1")
return filtered[0]
def find_single_or_none(predicate: Callable[[_T], bool], iterable: Iterable[_T]) -> _T | None:
"""
Find a single element in a sequence matching a predicate if it exists.
Only throws exception if more than a single element is found.
"""
filtered = list(filter(predicate, iterable))
if len(filtered) > 1:
raise Exception(f"find_single_or_none found {len(list(filtered))} matching elements but expected 0 or 1")
return filtered[0] if len(filtered) == 1 else None