From 46a3df2aeea36c1668bf3440c286bce3029727c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zo=C3=A9=20Cassiop=C3=A9e=20Gauthier?= Date: Fri, 5 Apr 2024 21:21:19 -0400 Subject: [PATCH] Cache raid simulations results instead of storing results in local database --- .gitignore | 3 +- pyproject.toml | 1 + src/pogo_scaled_estimators/calculator.py | 270 ++++-------------- src/pogo_scaled_estimators/cli.py | 7 +- .../pokebattler_proxy.py | 153 ++++++++++ 5 files changed, 205 insertions(+), 229 deletions(-) create mode 100644 src/pogo_scaled_estimators/pokebattler_proxy.py diff --git a/.gitignore b/.gitignore index 48a3e0d..cc23781 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ __pycache__/ dist/ -*.db -*.json +*.sqlite diff --git a/pyproject.toml b/pyproject.toml index 822bdf4..dc84956 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,6 +7,7 @@ name = "pogo-scaled-estimators" version = "1.0a1" dependencies = [ "requests", + "requests-cache", "rich", ] requires-python = ">=3.12" diff --git a/src/pogo_scaled_estimators/calculator.py b/src/pogo_scaled_estimators/calculator.py index 1438692..8218c25 100644 --- a/src/pogo_scaled_estimators/calculator.py +++ b/src/pogo_scaled_estimators/calculator.py @@ -4,242 +4,70 @@ # license that can be found in the LICENSE file or at # https://opensource.org/licenses/MIT. -import contextlib -import functools -import json -import operator -import sqlite3 -import time -import urllib.parse -from pathlib import Path from typing import final -import requests -from rich.progress import Progress, TaskID +from rich.progress import Progress -from pogo_scaled_estimators.utilities import format_move_name, format_pokemon_name - -WEAKNESS = 1.6 -DOUBLE_WEAKNESS = WEAKNESS * WEAKNESS +from pogo_scaled_estimators.pokebattler_proxy import Moveset, PokebattlerProxy, Raid +from pogo_scaled_estimators.utilities import format_pokemon_name @final class Calculator: def __init__(self, attacker_types: list[str]) -> None: - self.db = sqlite3.connect("ase.db") - # db.set_trace_callback(print) - - with contextlib.suppress(sqlite3.OperationalError): - _ = self.db.execute( - "CREATE TABLE estimators(defender, raid_tier, attacker, level, quick_move, charged_move, estimator, party)" - ) - - self.raids = self._pokebattler_resource("raids") - self.pokemon = self._pokebattler_resource("pokemon") - self.resists = self._pokebattler_resource("resists") - self.moves = self._pokebattler_resource("moves") - self.attacker_types = attacker_types - - self.progress = Progress() - self._refresh_task: TaskID | None = None - - def _pokebattler_resource(self, name: str) -> dict: - p = Path(f"./{name}.json") - if not p.exists(): - response = requests.get(f"https://fight.pokebattler.com/{name}") - with p.open(mode="wb") as fp: - _ = fp.write(response.content) - - with p.open() as fp: - return json.load(fp) + self._pokebattler_proxy = PokebattlerProxy() + self._progress = Progress() def calculate(self, level: int = 40, party: int = 1) -> None: - raid_bosses = self._raid_bosses() - charged_moves = [ - move["moveId"] for move in self.moves["move"] if "type" in move and move["type"] in self.attacker_types - ] - res = self.db.execute( - f"""SELECT DISTINCT(attacker) FROM estimators WHERE charged_move IN ("{'","'.join(charged_moves)}")""" - ) - attackers = {row[0]: {"RAID_LEVEL_3": [], "RAID_LEVEL_5": [], "RAID_LEVEL_MEGA": []} for row in res.fetchall()} - movesets = {} + raid_bosses = self._pokebattler_proxy.raid_bosses(self.attacker_types) + attackers = { + attacker: {"RAID_LEVEL_3": [], "RAID_LEVEL_5": [], "RAID_LEVEL_MEGA": []} + for attacker in self._pokebattler_proxy.with_charged_moves(self.attacker_types) + } - defenders = functools.reduce(operator.iconcat, raid_bosses.values(), []) - res = self.db.execute( - f""" - SELECT e.raid_tier, e.defender, e.attacker, e.estimator / m.min_estimator, e.quick_move, e.charged_move - FROM estimators e - INNER JOIN ( - SELECT defender, MIN(estimator) AS min_estimator - FROM estimators - WHERE party = ? AND level = 40 - AND attacker IN ("{'","'.join(attackers.keys())}") - AND defender IN ("{'","'.join(defenders)}") - GROUP BY defender - ) AS m ON e.defender = m.defender - INNER JOIN ( - SELECT defender, attacker, MIN(estimator) as min_estimator - FROM estimators - WHERE party = ? AND level = ? AND charged_move IN ("{'","'.join(charged_moves)}") - GROUP BY defender, attacker - ) AS ms ON e.defender = ms.defender AND e.attacker = ms.attacker AND e.estimator = ms.min_estimator - WHERE e.attacker IN ("{'","'.join(attackers.keys())}") - """, - (party, party, level), - ) - for raid_tier, _, attacker, estimator, fast_move, charged_move in res.fetchall(): - if raid_tier == "RAID_LEVEL_MEGA_5": - simplified_raid_tier = "RAID_LEVEL_MEGA" - elif raid_tier == "RAID_LEVEL_ULTRA_BEAST": - simplified_raid_tier = "RAID_LEVEL_5" - else: - simplified_raid_tier = raid_tier - attackers[attacker][simplified_raid_tier].append(estimator) - movesets[attacker] = (fast_move, charged_move) + with self._progress: + total_defenders = sum(len(defenders) for defenders in raid_bosses.values()) + task = self._progress.add_task("Working...", total=(total_defenders * 30)) + + for raid_tier, defenders in raid_bosses.items(): + if raid_tier == "RAID_LEVEL_MEGA_5": + simplified_raid_tier = "RAID_LEVEL_MEGA" + elif raid_tier == "RAID_LEVEL_ULTRA_BEAST": + simplified_raid_tier = "RAID_LEVEL_5" + else: + simplified_raid_tier = raid_tier + for defender in defenders: + self._progress.update(task, description=f"vs {format_pokemon_name(defender)}...") + raid = Raid(raid_tier, defender, level, party) + results = { + attacker: movesets + for attacker, movesets in self._pokebattler_proxy.simulate(raid).items() + if attacker in attackers + } + best_movesets = { + attacker: min(movesets, key=lambda moveset: moveset.estimator) + for attacker, movesets in results.items() + } + best_estimator = min(best_movesets.values(), key=lambda moveset: moveset.estimator).estimator + for attacker, moveset in best_movesets.items(): + self._progress.update(task, advance=1) + attackers[attacker][simplified_raid_tier].append(moveset.scale(best_estimator)) ase = {} - for attacker, estimators in attackers.items(): - if not estimators["RAID_LEVEL_3"] or not estimators["RAID_LEVEL_5"] or not estimators["RAID_LEVEL_MEGA"]: + for attacker, raid_tier_results in attackers.items(): + if ( + not raid_tier_results["RAID_LEVEL_3"] + or not raid_tier_results["RAID_LEVEL_5"] + or not raid_tier_results["RAID_LEVEL_MEGA"] + ): continue ase = ( - 0.15 * sum(estimators["RAID_LEVEL_3"]) / len(estimators["RAID_LEVEL_3"]) - + 0.50 * sum(estimators["RAID_LEVEL_5"]) / len(estimators["RAID_LEVEL_5"]) - + 0.35 * sum(estimators["RAID_LEVEL_MEGA"]) / len(estimators["RAID_LEVEL_MEGA"]) + 0.15 * self._average_estimator(raid_tier_results["RAID_LEVEL_3"]) + + 0.50 * self._average_estimator(raid_tier_results["RAID_LEVEL_5"]) + + 0.35 * self._average_estimator(raid_tier_results["RAID_LEVEL_MEGA"]) ) - fast_move, charged_move = movesets[attacker] - print(f"{attacker},{level},{ase},{fast_move},{charged_move}") + print(f"{attacker},{level},{ase}") - def _raid_bosses(self) -> dict: - raid_tiers = [] - raid_bosses = {} - - for raid_level in ["3", "5", "MEGA", "MEGA_5", "ULTRA_BEAST"]: - tier = f"RAID_LEVEL_{raid_level}" - raid_tiers.extend( - [ - tier, - f"{tier}_LEGACY", - f"{tier}_FUTURE", - ] - ) - raid_bosses[tier] = [] - - for tier in filter(lambda tier: tier["tier"] in raid_tiers, self.raids["tiers"]): - for boss in (raid["pokemon"] for raid in tier["raids"]): - if boss.endswith("_FORM"): - continue - boss_pokemon = next(filter(lambda mon: mon["pokemonId"] == boss, self.pokemon["pokemon"])) - if ("candyToEvolve" in boss_pokemon or boss in ["SEADRA", "SEALEO"]) and boss not in [ - "KELDEO", - "LUMINEON", - "MANAPHY", - "PHIONE", - "STUNFISK", - "TERRAKION", - ]: - continue - boss_types = ( - boss_pokemon["type"], - boss_pokemon.get("type2", "POKEMON_TYPE_NONE"), - ) - if any(self._is_weak(attacker_type, boss_types) for attacker_type in self.attacker_types): - raid_bosses[tier["info"]["guessTier"]].append(boss) - - return raid_bosses - - def _is_weak(self, attacker_type: str, defender_types: tuple[str, str]) -> bool: - pokemon_types = list(self.resists.keys()) - defender_type_indices = ( - pokemon_types.index(defender_types[0]), - pokemon_types.index(defender_types[1]), - ) - attack_resist = ( - self.resists[attacker_type][defender_type_indices[0]] - * self.resists[attacker_type][defender_type_indices[1]] - ) - - # Check for double weakness. - if attack_resist >= DOUBLE_WEAKNESS: - return True - - # Checkout for single weakness if not double weak to anything else. - any_double_weaknesses = any( - r[defender_type_indices[0]] * r[defender_type_indices[1]] >= DOUBLE_WEAKNESS for r in self.resists.values() - ) - if not any_double_weaknesses and attack_resist >= WEAKNESS: - return True - - return False - - def load_estimators(self, tier: str, defender: str, level: int, party: int) -> None: - base_url = "https://fight.pokebattler.com" - query_string = { - "sort": "ESTIMATOR", - "weatherCondition": "NO_WEATHER", - "dodgeStrategy": "DODGE_REACTION_TIME", - "aggregation": "AVERAGE", - "includeLegendary": "true", - "includeShadow": "true", - "includeMegas": "true", - "primalAssistants": "", - "numParty": str(party), - } - if tier != "RAID_LEVEL_3": - query_string["friendshipLevel"] = "FRIENDSHIP_LEVEL_4" - url = f"{base_url}/raids/defenders/{defender}/levels/{tier}/attackers/levels/{level}/strategies/CINEMATIC_ATTACK_WHEN_POSSIBLE/DEFENSE_RANDOM_MC?{urllib.parse.urlencode(query_string, doseq=True)}" - response = requests.get(url) - for attacker in response.json()["attackers"][0]["randomMove"]["defenders"]: - for attacker_moves in attacker["byMove"]: - res = self.db.execute( - "SELECT estimator FROM estimators WHERE defender=? AND attacker=? AND level=? AND quick_move=? AND charged_move=? AND party=?", - ( - defender, - attacker["pokemonId"], - level, - attacker_moves["move1"], - attacker_moves["move2"], - party, - ), - ) - estimator = res.fetchone() - if estimator is not None: - self.progress.console.log( - f'{format_pokemon_name(attacker["pokemonId"])} ({format_move_name(attacker_moves["move1"])}/{format_move_name(attacker_moves["move2"])}): {estimator[0]:.2f} (cached)' - ) - continue - - _ = self.db.execute( - "INSERT INTO estimators(defender, raid_tier, attacker, level, quick_move, charged_move, estimator, party) VALUES(?, ?, ?, ?, ?, ?, ?, ?)", - ( - defender, - tier, - attacker["pokemonId"], - level, - attacker_moves["move1"], - attacker_moves["move2"], - attacker_moves["result"]["estimator"], - party, - ), - ) - self.progress.console.log( - f'{format_pokemon_name(attacker["pokemonId"])} ({format_move_name(attacker_moves["move1"])}/{format_move_name(attacker_moves["move2"])}): {attacker_moves["result"]["estimator"]:.2f}' - ) - - if self._refresh_task is not None: - self.progress.update(self._refresh_task, advance=1) - self.db.commit() - - def refresh(self, level: int = 40, party: int = 1) -> None: - with self.progress: - total_defenders = sum(len(defenders) for defenders in self._raid_bosses().values()) - self._refresh_task = self.progress.add_task("Working...", total=(total_defenders * 30)) - for tier, defenders in self._raid_bosses().items(): - for defender in defenders: - self.progress.update(self._refresh_task, description=f"vs {format_pokemon_name(defender)}...") - try: - self.load_estimators(tier, defender, level, party) - except json.decoder.JSONDecodeError: - time.sleep(30) - self.load_estimators(tier, defender, level, party) - time.sleep(1) + def _average_estimator(self, movesets: list[Moveset]) -> float: + return sum(moveset.estimator for moveset in movesets) / len(movesets) diff --git a/src/pogo_scaled_estimators/cli.py b/src/pogo_scaled_estimators/cli.py index 309e7ad..628bc36 100644 --- a/src/pogo_scaled_estimators/cli.py +++ b/src/pogo_scaled_estimators/cli.py @@ -15,15 +15,10 @@ def main_cli(): _ = parser.add_argument("type", nargs="+", help="an attacker type") _ = parser.add_argument("--level", type=int, default=40) _ = parser.add_argument("--party", type=int, default=1) - _ = parser.add_argument("--refresh", action="store_true", default=False) args = parser.parse_args() - calculator = pogo_scaled_estimators.calculator.Calculator(args.type) - if args.refresh: - calculator.refresh(level=args.level, party=args.party) - else: - calculator.calculate(level=args.level, party=args.party) + calculator.calculate(level=args.level, party=args.party) if __name__ == "__main__": diff --git a/src/pogo_scaled_estimators/pokebattler_proxy.py b/src/pogo_scaled_estimators/pokebattler_proxy.py new file mode 100644 index 0000000..794db03 --- /dev/null +++ b/src/pogo_scaled_estimators/pokebattler_proxy.py @@ -0,0 +1,153 @@ +import urllib.parse +from dataclasses import dataclass +from functools import cached_property +from typing import cast, final + +import requests_cache + +BASE_URL = "https://fight.pokebattler.com" +WEAKNESS = 1.6 +DOUBLE_WEAKNESS = WEAKNESS * WEAKNESS + + +@dataclass(frozen=True) +class Raid: + tier: str + defender: str + level: int = 40 + party: int = 1 + + +@dataclass +class Moveset: + fast_move: str + charged_move: str + estimator: float + + def scale(self, factor: float): + return Moveset(self.fast_move, self.charged_move, self.estimator / factor) + + +@final +class PokebattlerProxy: + def __init__(self): + self._cached_session = requests_cache.CachedSession("pokebatter_cache", cache_control=True) + self._pokemon: dict | None = None + self._raids: dict | None = None + self._resists: dict | None = None + + @property + def cached_session(self): + return self._cached_session + + @cached_property + def moves(self) -> dict: + return self.cached_session.get(f"{BASE_URL}/moves").json()["move"] + + @cached_property + def pokemon(self) -> dict: + return self.cached_session.get(f"{BASE_URL}/pokemon").json()["pokemon"] + + @cached_property + def raids(self) -> dict: + return self.cached_session.get(f"{BASE_URL}/raids").json() + + @cached_property + def resists(self) -> dict: + return self.cached_session.get(f"{BASE_URL}/resists").json() + + def simulate(self, raid: Raid) -> dict[str, list[Moveset]]: + query_string = { + "sort": "ESTIMATOR", + "weatherCondition": "NO_WEATHER", + "dodgeStrategy": "DODGE_REACTION_TIME", + "aggregation": "AVERAGE", + "includeLegendary": "true", + "includeShadow": "true", + "includeMegas": "true", + "primalAssistants": "", + "numParty": str(raid.party), + } + if raid.tier != "RAID_LEVEL_3": + query_string["friendshipLevel"] = "FRIENDSHIP_LEVEL_4" + url = f"{BASE_URL}/raids/defenders/{raid.defender}/levels/{raid.tier}/attackers/levels/{raid.level}/strategies/CINEMATIC_ATTACK_WHEN_POSSIBLE/DEFENSE_RANDOM_MC?{urllib.parse.urlencode(query_string, doseq=True)}" + response = self._cached_session.get(url) + results: dict[str, list[Moveset]] = {} + for attacker in response.json()["attackers"][0]["randomMove"]["defenders"]: + results[attacker["pokemonId"]] = [ + Moveset( + attacker_moves["move1"], attacker_moves["move2"], cast(float, attacker_moves["result"]["estimator"]) + ) + for attacker_moves in attacker["byMove"] + ] + return results + + def raid_bosses(self, attacker_types: list[str]) -> dict: + raid_tiers = [] + raid_bosses = {} + + for raid_level in ["3", "5", "MEGA", "MEGA_5", "ULTRA_BEAST"]: + tier = f"RAID_LEVEL_{raid_level}" + raid_tiers.extend( + [ + tier, + f"{tier}_LEGACY", + f"{tier}_FUTURE", + ] + ) + raid_bosses[tier] = [] + + for tier in filter(lambda tier: tier["tier"] in raid_tiers, self.raids["tiers"]): + for boss in (raid["pokemon"] for raid in tier["raids"]): + if boss.endswith("_FORM"): + continue + boss_pokemon = next(filter(lambda mon: mon["pokemonId"] == boss, self.pokemon)) + if ("candyToEvolve" in boss_pokemon or boss in ["SEADRA", "SEALEO"]) and boss not in [ + "KELDEO", + "LUMINEON", + "MANAPHY", + "PHIONE", + "STUNFISK", + "TERRAKION", + ]: + continue + boss_types = ( + boss_pokemon["type"], + boss_pokemon.get("type2", "POKEMON_TYPE_NONE"), + ) + if any(self._is_weak(attacker_type, boss_types) for attacker_type in attacker_types): + raid_bosses[tier["info"]["guessTier"]].append(boss) + + return raid_bosses + + def _is_weak(self, attacker_type: str, defender_types: tuple[str, str]) -> bool: + pokemon_types = list(self.resists.keys()) + defender_type_indices = ( + pokemon_types.index(defender_types[0]), + pokemon_types.index(defender_types[1]), + ) + attack_resist = ( + self.resists[attacker_type][defender_type_indices[0]] + * self.resists[attacker_type][defender_type_indices[1]] + ) + + # Check for double weakness. + if attack_resist >= DOUBLE_WEAKNESS: + return True + + # Checkout for single weakness if not double weak to anything else. + any_double_weaknesses = any( + r[defender_type_indices[0]] * r[defender_type_indices[1]] >= DOUBLE_WEAKNESS for r in self.resists.values() + ) + if not any_double_weaknesses and attack_resist >= WEAKNESS: + return True + + return False + + def with_charged_moves(self, attacker_types: list[str]) -> list[str]: + charged_moves = [move["moveId"] for move in self.moves if "type" in move and move["type"] in attacker_types] + return [ + mon["pokemonId"] + for mon in self.pokemon + if any(moveset["cinematicMove"] in charged_moves for moveset in mon["movesets"]) + ]