Cache raid simulations results instead of storing results in local database

This commit is contained in:
Zoé Cassiopée Gauthier 2024-04-05 21:21:19 -04:00
parent 5ff176e1e2
commit 46a3df2aee
5 changed files with 205 additions and 229 deletions

3
.gitignore vendored
View File

@ -1,4 +1,3 @@
__pycache__/
dist/
*.db
*.json
*.sqlite

View File

@ -7,6 +7,7 @@ name = "pogo-scaled-estimators"
version = "1.0a1"
dependencies = [
"requests",
"requests-cache",
"rich",
]
requires-python = ">=3.12"

View File

@ -4,242 +4,70 @@
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
import contextlib
import functools
import json
import operator
import sqlite3
import time
import urllib.parse
from pathlib import Path
from typing import final
import requests
from rich.progress import Progress, TaskID
from rich.progress import Progress
from pogo_scaled_estimators.utilities import format_move_name, format_pokemon_name
WEAKNESS = 1.6
DOUBLE_WEAKNESS = WEAKNESS * WEAKNESS
from pogo_scaled_estimators.pokebattler_proxy import Moveset, PokebattlerProxy, Raid
from pogo_scaled_estimators.utilities import format_pokemon_name
@final
class Calculator:
def __init__(self, attacker_types: list[str]) -> None:
self.db = sqlite3.connect("ase.db")
# db.set_trace_callback(print)
with contextlib.suppress(sqlite3.OperationalError):
_ = self.db.execute(
"CREATE TABLE estimators(defender, raid_tier, attacker, level, quick_move, charged_move, estimator, party)"
)
self.raids = self._pokebattler_resource("raids")
self.pokemon = self._pokebattler_resource("pokemon")
self.resists = self._pokebattler_resource("resists")
self.moves = self._pokebattler_resource("moves")
self.attacker_types = attacker_types
self.progress = Progress()
self._refresh_task: TaskID | None = None
def _pokebattler_resource(self, name: str) -> dict:
p = Path(f"./{name}.json")
if not p.exists():
response = requests.get(f"https://fight.pokebattler.com/{name}")
with p.open(mode="wb") as fp:
_ = fp.write(response.content)
with p.open() as fp:
return json.load(fp)
self._pokebattler_proxy = PokebattlerProxy()
self._progress = Progress()
def calculate(self, level: int = 40, party: int = 1) -> None:
raid_bosses = self._raid_bosses()
charged_moves = [
move["moveId"] for move in self.moves["move"] if "type" in move and move["type"] in self.attacker_types
]
res = self.db.execute(
f"""SELECT DISTINCT(attacker) FROM estimators WHERE charged_move IN ("{'","'.join(charged_moves)}")"""
)
attackers = {row[0]: {"RAID_LEVEL_3": [], "RAID_LEVEL_5": [], "RAID_LEVEL_MEGA": []} for row in res.fetchall()}
movesets = {}
raid_bosses = self._pokebattler_proxy.raid_bosses(self.attacker_types)
attackers = {
attacker: {"RAID_LEVEL_3": [], "RAID_LEVEL_5": [], "RAID_LEVEL_MEGA": []}
for attacker in self._pokebattler_proxy.with_charged_moves(self.attacker_types)
}
defenders = functools.reduce(operator.iconcat, raid_bosses.values(), [])
res = self.db.execute(
f"""
SELECT e.raid_tier, e.defender, e.attacker, e.estimator / m.min_estimator, e.quick_move, e.charged_move
FROM estimators e
INNER JOIN (
SELECT defender, MIN(estimator) AS min_estimator
FROM estimators
WHERE party = ? AND level = 40
AND attacker IN ("{'","'.join(attackers.keys())}")
AND defender IN ("{'","'.join(defenders)}")
GROUP BY defender
) AS m ON e.defender = m.defender
INNER JOIN (
SELECT defender, attacker, MIN(estimator) as min_estimator
FROM estimators
WHERE party = ? AND level = ? AND charged_move IN ("{'","'.join(charged_moves)}")
GROUP BY defender, attacker
) AS ms ON e.defender = ms.defender AND e.attacker = ms.attacker AND e.estimator = ms.min_estimator
WHERE e.attacker IN ("{'","'.join(attackers.keys())}")
""",
(party, party, level),
)
for raid_tier, _, attacker, estimator, fast_move, charged_move in res.fetchall():
if raid_tier == "RAID_LEVEL_MEGA_5":
simplified_raid_tier = "RAID_LEVEL_MEGA"
elif raid_tier == "RAID_LEVEL_ULTRA_BEAST":
simplified_raid_tier = "RAID_LEVEL_5"
else:
simplified_raid_tier = raid_tier
attackers[attacker][simplified_raid_tier].append(estimator)
movesets[attacker] = (fast_move, charged_move)
with self._progress:
total_defenders = sum(len(defenders) for defenders in raid_bosses.values())
task = self._progress.add_task("Working...", total=(total_defenders * 30))
for raid_tier, defenders in raid_bosses.items():
if raid_tier == "RAID_LEVEL_MEGA_5":
simplified_raid_tier = "RAID_LEVEL_MEGA"
elif raid_tier == "RAID_LEVEL_ULTRA_BEAST":
simplified_raid_tier = "RAID_LEVEL_5"
else:
simplified_raid_tier = raid_tier
for defender in defenders:
self._progress.update(task, description=f"vs {format_pokemon_name(defender)}...")
raid = Raid(raid_tier, defender, level, party)
results = {
attacker: movesets
for attacker, movesets in self._pokebattler_proxy.simulate(raid).items()
if attacker in attackers
}
best_movesets = {
attacker: min(movesets, key=lambda moveset: moveset.estimator)
for attacker, movesets in results.items()
}
best_estimator = min(best_movesets.values(), key=lambda moveset: moveset.estimator).estimator
for attacker, moveset in best_movesets.items():
self._progress.update(task, advance=1)
attackers[attacker][simplified_raid_tier].append(moveset.scale(best_estimator))
ase = {}
for attacker, estimators in attackers.items():
if not estimators["RAID_LEVEL_3"] or not estimators["RAID_LEVEL_5"] or not estimators["RAID_LEVEL_MEGA"]:
for attacker, raid_tier_results in attackers.items():
if (
not raid_tier_results["RAID_LEVEL_3"]
or not raid_tier_results["RAID_LEVEL_5"]
or not raid_tier_results["RAID_LEVEL_MEGA"]
):
continue
ase = (
0.15 * sum(estimators["RAID_LEVEL_3"]) / len(estimators["RAID_LEVEL_3"])
+ 0.50 * sum(estimators["RAID_LEVEL_5"]) / len(estimators["RAID_LEVEL_5"])
+ 0.35 * sum(estimators["RAID_LEVEL_MEGA"]) / len(estimators["RAID_LEVEL_MEGA"])
0.15 * self._average_estimator(raid_tier_results["RAID_LEVEL_3"])
+ 0.50 * self._average_estimator(raid_tier_results["RAID_LEVEL_5"])
+ 0.35 * self._average_estimator(raid_tier_results["RAID_LEVEL_MEGA"])
)
fast_move, charged_move = movesets[attacker]
print(f"{attacker},{level},{ase},{fast_move},{charged_move}")
print(f"{attacker},{level},{ase}")
def _raid_bosses(self) -> dict:
raid_tiers = []
raid_bosses = {}
for raid_level in ["3", "5", "MEGA", "MEGA_5", "ULTRA_BEAST"]:
tier = f"RAID_LEVEL_{raid_level}"
raid_tiers.extend(
[
tier,
f"{tier}_LEGACY",
f"{tier}_FUTURE",
]
)
raid_bosses[tier] = []
for tier in filter(lambda tier: tier["tier"] in raid_tiers, self.raids["tiers"]):
for boss in (raid["pokemon"] for raid in tier["raids"]):
if boss.endswith("_FORM"):
continue
boss_pokemon = next(filter(lambda mon: mon["pokemonId"] == boss, self.pokemon["pokemon"]))
if ("candyToEvolve" in boss_pokemon or boss in ["SEADRA", "SEALEO"]) and boss not in [
"KELDEO",
"LUMINEON",
"MANAPHY",
"PHIONE",
"STUNFISK",
"TERRAKION",
]:
continue
boss_types = (
boss_pokemon["type"],
boss_pokemon.get("type2", "POKEMON_TYPE_NONE"),
)
if any(self._is_weak(attacker_type, boss_types) for attacker_type in self.attacker_types):
raid_bosses[tier["info"]["guessTier"]].append(boss)
return raid_bosses
def _is_weak(self, attacker_type: str, defender_types: tuple[str, str]) -> bool:
pokemon_types = list(self.resists.keys())
defender_type_indices = (
pokemon_types.index(defender_types[0]),
pokemon_types.index(defender_types[1]),
)
attack_resist = (
self.resists[attacker_type][defender_type_indices[0]]
* self.resists[attacker_type][defender_type_indices[1]]
)
# Check for double weakness.
if attack_resist >= DOUBLE_WEAKNESS:
return True
# Checkout for single weakness if not double weak to anything else.
any_double_weaknesses = any(
r[defender_type_indices[0]] * r[defender_type_indices[1]] >= DOUBLE_WEAKNESS for r in self.resists.values()
)
if not any_double_weaknesses and attack_resist >= WEAKNESS:
return True
return False
def load_estimators(self, tier: str, defender: str, level: int, party: int) -> None:
base_url = "https://fight.pokebattler.com"
query_string = {
"sort": "ESTIMATOR",
"weatherCondition": "NO_WEATHER",
"dodgeStrategy": "DODGE_REACTION_TIME",
"aggregation": "AVERAGE",
"includeLegendary": "true",
"includeShadow": "true",
"includeMegas": "true",
"primalAssistants": "",
"numParty": str(party),
}
if tier != "RAID_LEVEL_3":
query_string["friendshipLevel"] = "FRIENDSHIP_LEVEL_4"
url = f"{base_url}/raids/defenders/{defender}/levels/{tier}/attackers/levels/{level}/strategies/CINEMATIC_ATTACK_WHEN_POSSIBLE/DEFENSE_RANDOM_MC?{urllib.parse.urlencode(query_string, doseq=True)}"
response = requests.get(url)
for attacker in response.json()["attackers"][0]["randomMove"]["defenders"]:
for attacker_moves in attacker["byMove"]:
res = self.db.execute(
"SELECT estimator FROM estimators WHERE defender=? AND attacker=? AND level=? AND quick_move=? AND charged_move=? AND party=?",
(
defender,
attacker["pokemonId"],
level,
attacker_moves["move1"],
attacker_moves["move2"],
party,
),
)
estimator = res.fetchone()
if estimator is not None:
self.progress.console.log(
f'{format_pokemon_name(attacker["pokemonId"])} ({format_move_name(attacker_moves["move1"])}/{format_move_name(attacker_moves["move2"])}): {estimator[0]:.2f} (cached)'
)
continue
_ = self.db.execute(
"INSERT INTO estimators(defender, raid_tier, attacker, level, quick_move, charged_move, estimator, party) VALUES(?, ?, ?, ?, ?, ?, ?, ?)",
(
defender,
tier,
attacker["pokemonId"],
level,
attacker_moves["move1"],
attacker_moves["move2"],
attacker_moves["result"]["estimator"],
party,
),
)
self.progress.console.log(
f'{format_pokemon_name(attacker["pokemonId"])} ({format_move_name(attacker_moves["move1"])}/{format_move_name(attacker_moves["move2"])}): {attacker_moves["result"]["estimator"]:.2f}'
)
if self._refresh_task is not None:
self.progress.update(self._refresh_task, advance=1)
self.db.commit()
def refresh(self, level: int = 40, party: int = 1) -> None:
with self.progress:
total_defenders = sum(len(defenders) for defenders in self._raid_bosses().values())
self._refresh_task = self.progress.add_task("Working...", total=(total_defenders * 30))
for tier, defenders in self._raid_bosses().items():
for defender in defenders:
self.progress.update(self._refresh_task, description=f"vs {format_pokemon_name(defender)}...")
try:
self.load_estimators(tier, defender, level, party)
except json.decoder.JSONDecodeError:
time.sleep(30)
self.load_estimators(tier, defender, level, party)
time.sleep(1)
def _average_estimator(self, movesets: list[Moveset]) -> float:
return sum(moveset.estimator for moveset in movesets) / len(movesets)

View File

@ -15,15 +15,10 @@ def main_cli():
_ = parser.add_argument("type", nargs="+", help="an attacker type")
_ = parser.add_argument("--level", type=int, default=40)
_ = parser.add_argument("--party", type=int, default=1)
_ = parser.add_argument("--refresh", action="store_true", default=False)
args = parser.parse_args()
calculator = pogo_scaled_estimators.calculator.Calculator(args.type)
if args.refresh:
calculator.refresh(level=args.level, party=args.party)
else:
calculator.calculate(level=args.level, party=args.party)
calculator.calculate(level=args.level, party=args.party)
if __name__ == "__main__":

View File

@ -0,0 +1,153 @@
import urllib.parse
from dataclasses import dataclass
from functools import cached_property
from typing import cast, final
import requests_cache
BASE_URL = "https://fight.pokebattler.com"
WEAKNESS = 1.6
DOUBLE_WEAKNESS = WEAKNESS * WEAKNESS
@dataclass(frozen=True)
class Raid:
tier: str
defender: str
level: int = 40
party: int = 1
@dataclass
class Moveset:
fast_move: str
charged_move: str
estimator: float
def scale(self, factor: float):
return Moveset(self.fast_move, self.charged_move, self.estimator / factor)
@final
class PokebattlerProxy:
def __init__(self):
self._cached_session = requests_cache.CachedSession("pokebatter_cache", cache_control=True)
self._pokemon: dict | None = None
self._raids: dict | None = None
self._resists: dict | None = None
@property
def cached_session(self):
return self._cached_session
@cached_property
def moves(self) -> dict:
return self.cached_session.get(f"{BASE_URL}/moves").json()["move"]
@cached_property
def pokemon(self) -> dict:
return self.cached_session.get(f"{BASE_URL}/pokemon").json()["pokemon"]
@cached_property
def raids(self) -> dict:
return self.cached_session.get(f"{BASE_URL}/raids").json()
@cached_property
def resists(self) -> dict:
return self.cached_session.get(f"{BASE_URL}/resists").json()
def simulate(self, raid: Raid) -> dict[str, list[Moveset]]:
query_string = {
"sort": "ESTIMATOR",
"weatherCondition": "NO_WEATHER",
"dodgeStrategy": "DODGE_REACTION_TIME",
"aggregation": "AVERAGE",
"includeLegendary": "true",
"includeShadow": "true",
"includeMegas": "true",
"primalAssistants": "",
"numParty": str(raid.party),
}
if raid.tier != "RAID_LEVEL_3":
query_string["friendshipLevel"] = "FRIENDSHIP_LEVEL_4"
url = f"{BASE_URL}/raids/defenders/{raid.defender}/levels/{raid.tier}/attackers/levels/{raid.level}/strategies/CINEMATIC_ATTACK_WHEN_POSSIBLE/DEFENSE_RANDOM_MC?{urllib.parse.urlencode(query_string, doseq=True)}"
response = self._cached_session.get(url)
results: dict[str, list[Moveset]] = {}
for attacker in response.json()["attackers"][0]["randomMove"]["defenders"]:
results[attacker["pokemonId"]] = [
Moveset(
attacker_moves["move1"], attacker_moves["move2"], cast(float, attacker_moves["result"]["estimator"])
)
for attacker_moves in attacker["byMove"]
]
return results
def raid_bosses(self, attacker_types: list[str]) -> dict:
raid_tiers = []
raid_bosses = {}
for raid_level in ["3", "5", "MEGA", "MEGA_5", "ULTRA_BEAST"]:
tier = f"RAID_LEVEL_{raid_level}"
raid_tiers.extend(
[
tier,
f"{tier}_LEGACY",
f"{tier}_FUTURE",
]
)
raid_bosses[tier] = []
for tier in filter(lambda tier: tier["tier"] in raid_tiers, self.raids["tiers"]):
for boss in (raid["pokemon"] for raid in tier["raids"]):
if boss.endswith("_FORM"):
continue
boss_pokemon = next(filter(lambda mon: mon["pokemonId"] == boss, self.pokemon))
if ("candyToEvolve" in boss_pokemon or boss in ["SEADRA", "SEALEO"]) and boss not in [
"KELDEO",
"LUMINEON",
"MANAPHY",
"PHIONE",
"STUNFISK",
"TERRAKION",
]:
continue
boss_types = (
boss_pokemon["type"],
boss_pokemon.get("type2", "POKEMON_TYPE_NONE"),
)
if any(self._is_weak(attacker_type, boss_types) for attacker_type in attacker_types):
raid_bosses[tier["info"]["guessTier"]].append(boss)
return raid_bosses
def _is_weak(self, attacker_type: str, defender_types: tuple[str, str]) -> bool:
pokemon_types = list(self.resists.keys())
defender_type_indices = (
pokemon_types.index(defender_types[0]),
pokemon_types.index(defender_types[1]),
)
attack_resist = (
self.resists[attacker_type][defender_type_indices[0]]
* self.resists[attacker_type][defender_type_indices[1]]
)
# Check for double weakness.
if attack_resist >= DOUBLE_WEAKNESS:
return True
# Checkout for single weakness if not double weak to anything else.
any_double_weaknesses = any(
r[defender_type_indices[0]] * r[defender_type_indices[1]] >= DOUBLE_WEAKNESS for r in self.resists.values()
)
if not any_double_weaknesses and attack_resist >= WEAKNESS:
return True
return False
def with_charged_moves(self, attacker_types: list[str]) -> list[str]:
charged_moves = [move["moveId"] for move in self.moves if "type" in move and move["type"] in attacker_types]
return [
mon["pokemonId"]
for mon in self.pokemon
if any(moveset["cinematicMove"] in charged_moves for moveset in mon["movesets"])
]