Compare commits

..

3 Commits

Author SHA1 Message Date
Zoé Cassiopée Gauthier
c67a69b3d0 Remove attacker movesets that don't match the requested type 2024-04-05 22:11:56 -04:00
Zoé Cassiopée Gauthier
61e0520886 Pretty colors everywhere 2024-04-05 21:52:59 -04:00
Zoé Cassiopée Gauthier
46a3df2aee Cache raid simulations results instead of storing results in local database 2024-04-05 21:21:19 -04:00
6 changed files with 256 additions and 235 deletions

3
.gitignore vendored
View File

@ -1,4 +1,3 @@
__pycache__/
dist/
*.db
*.json
*.sqlite

View File

@ -7,6 +7,7 @@ name = "pogo-scaled-estimators"
version = "1.0a1"
dependencies = [
"requests",
"requests-cache",
"rich",
]
requires-python = ">=3.12"

View File

@ -4,242 +4,82 @@
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
import contextlib
import functools
import json
import operator
import sqlite3
import time
import urllib.parse
from pathlib import Path
from typing import final
import requests
from rich.progress import Progress, TaskID
from rich.progress import Progress
from pogo_scaled_estimators.pokebattler_proxy import Moveset, PokebattlerProxy, Raid
from pogo_scaled_estimators.utilities import format_move_name, format_pokemon_name
WEAKNESS = 1.6
DOUBLE_WEAKNESS = WEAKNESS * WEAKNESS
@final
class Calculator:
def __init__(self, attacker_types: list[str]) -> None:
self.db = sqlite3.connect("ase.db")
# db.set_trace_callback(print)
with contextlib.suppress(sqlite3.OperationalError):
_ = self.db.execute(
"CREATE TABLE estimators(defender, raid_tier, attacker, level, quick_move, charged_move, estimator, party)"
)
self.raids = self._pokebattler_resource("raids")
self.pokemon = self._pokebattler_resource("pokemon")
self.resists = self._pokebattler_resource("resists")
self.moves = self._pokebattler_resource("moves")
self.attacker_types = attacker_types
self.progress = Progress()
self._refresh_task: TaskID | None = None
def _pokebattler_resource(self, name: str) -> dict:
p = Path(f"./{name}.json")
if not p.exists():
response = requests.get(f"https://fight.pokebattler.com/{name}")
with p.open(mode="wb") as fp:
_ = fp.write(response.content)
with p.open() as fp:
return json.load(fp)
self._pokebattler_proxy = PokebattlerProxy()
self._progress = Progress()
def calculate(self, level: int = 40, party: int = 1) -> None:
raid_bosses = self._raid_bosses()
charged_moves = [
move["moveId"] for move in self.moves["move"] if "type" in move and move["type"] in self.attacker_types
]
res = self.db.execute(
f"""SELECT DISTINCT(attacker) FROM estimators WHERE charged_move IN ("{'","'.join(charged_moves)}")"""
)
attackers = {row[0]: {"RAID_LEVEL_3": [], "RAID_LEVEL_5": [], "RAID_LEVEL_MEGA": []} for row in res.fetchall()}
movesets = {}
defenders = functools.reduce(operator.iconcat, raid_bosses.values(), [])
res = self.db.execute(
f"""
SELECT e.raid_tier, e.defender, e.attacker, e.estimator / m.min_estimator, e.quick_move, e.charged_move
FROM estimators e
INNER JOIN (
SELECT defender, MIN(estimator) AS min_estimator
FROM estimators
WHERE party = ? AND level = 40
AND attacker IN ("{'","'.join(attackers.keys())}")
AND defender IN ("{'","'.join(defenders)}")
GROUP BY defender
) AS m ON e.defender = m.defender
INNER JOIN (
SELECT defender, attacker, MIN(estimator) as min_estimator
FROM estimators
WHERE party = ? AND level = ? AND charged_move IN ("{'","'.join(charged_moves)}")
GROUP BY defender, attacker
) AS ms ON e.defender = ms.defender AND e.attacker = ms.attacker AND e.estimator = ms.min_estimator
WHERE e.attacker IN ("{'","'.join(attackers.keys())}")
""",
(party, party, level),
)
for raid_tier, _, attacker, estimator, fast_move, charged_move in res.fetchall():
if raid_tier == "RAID_LEVEL_MEGA_5":
simplified_raid_tier = "RAID_LEVEL_MEGA"
elif raid_tier == "RAID_LEVEL_ULTRA_BEAST":
simplified_raid_tier = "RAID_LEVEL_5"
else:
simplified_raid_tier = raid_tier
attackers[attacker][simplified_raid_tier].append(estimator)
movesets[attacker] = (fast_move, charged_move)
ase = {}
for attacker, estimators in attackers.items():
if not estimators["RAID_LEVEL_3"] or not estimators["RAID_LEVEL_5"] or not estimators["RAID_LEVEL_MEGA"]:
continue
ase = (
0.15 * sum(estimators["RAID_LEVEL_3"]) / len(estimators["RAID_LEVEL_3"])
+ 0.50 * sum(estimators["RAID_LEVEL_5"]) / len(estimators["RAID_LEVEL_5"])
+ 0.35 * sum(estimators["RAID_LEVEL_MEGA"]) / len(estimators["RAID_LEVEL_MEGA"])
)
fast_move, charged_move = movesets[attacker]
print(f"{attacker},{level},{ase},{fast_move},{charged_move}")
def _raid_bosses(self) -> dict:
raid_tiers = []
raid_bosses = {}
for raid_level in ["3", "5", "MEGA", "MEGA_5", "ULTRA_BEAST"]:
tier = f"RAID_LEVEL_{raid_level}"
raid_tiers.extend(
[
tier,
f"{tier}_LEGACY",
f"{tier}_FUTURE",
]
)
raid_bosses[tier] = []
for tier in filter(lambda tier: tier["tier"] in raid_tiers, self.raids["tiers"]):
for boss in (raid["pokemon"] for raid in tier["raids"]):
if boss.endswith("_FORM"):
continue
boss_pokemon = next(filter(lambda mon: mon["pokemonId"] == boss, self.pokemon["pokemon"]))
if ("candyToEvolve" in boss_pokemon or boss in ["SEADRA", "SEALEO"]) and boss not in [
"KELDEO",
"LUMINEON",
"MANAPHY",
"PHIONE",
"STUNFISK",
"TERRAKION",
]:
continue
boss_types = (
boss_pokemon["type"],
boss_pokemon.get("type2", "POKEMON_TYPE_NONE"),
)
if any(self._is_weak(attacker_type, boss_types) for attacker_type in self.attacker_types):
raid_bosses[tier["info"]["guessTier"]].append(boss)
return raid_bosses
def _is_weak(self, attacker_type: str, defender_types: tuple[str, str]) -> bool:
pokemon_types = list(self.resists.keys())
defender_type_indices = (
pokemon_types.index(defender_types[0]),
pokemon_types.index(defender_types[1]),
)
attack_resist = (
self.resists[attacker_type][defender_type_indices[0]]
* self.resists[attacker_type][defender_type_indices[1]]
)
# Check for double weakness.
if attack_resist >= DOUBLE_WEAKNESS:
return True
# Checkout for single weakness if not double weak to anything else.
any_double_weaknesses = any(
r[defender_type_indices[0]] * r[defender_type_indices[1]] >= DOUBLE_WEAKNESS for r in self.resists.values()
)
if not any_double_weaknesses and attack_resist >= WEAKNESS:
return True
return False
def load_estimators(self, tier: str, defender: str, level: int, party: int) -> None:
base_url = "https://fight.pokebattler.com"
query_string = {
"sort": "ESTIMATOR",
"weatherCondition": "NO_WEATHER",
"dodgeStrategy": "DODGE_REACTION_TIME",
"aggregation": "AVERAGE",
"includeLegendary": "true",
"includeShadow": "true",
"includeMegas": "true",
"primalAssistants": "",
"numParty": str(party),
raid_bosses = self._pokebattler_proxy.raid_bosses(self.attacker_types)
attackers = {
attacker: {"RAID_LEVEL_3": [], "RAID_LEVEL_5": [], "RAID_LEVEL_MEGA": []}
for attacker in self._pokebattler_proxy.with_charged_moves(self.attacker_types)
}
if tier != "RAID_LEVEL_3":
query_string["friendshipLevel"] = "FRIENDSHIP_LEVEL_4"
url = f"{base_url}/raids/defenders/{defender}/levels/{tier}/attackers/levels/{level}/strategies/CINEMATIC_ATTACK_WHEN_POSSIBLE/DEFENSE_RANDOM_MC?{urllib.parse.urlencode(query_string, doseq=True)}"
response = requests.get(url)
for attacker in response.json()["attackers"][0]["randomMove"]["defenders"]:
for attacker_moves in attacker["byMove"]:
res = self.db.execute(
"SELECT estimator FROM estimators WHERE defender=? AND attacker=? AND level=? AND quick_move=? AND charged_move=? AND party=?",
(
defender,
attacker["pokemonId"],
level,
attacker_moves["move1"],
attacker_moves["move2"],
party,
),
)
estimator = res.fetchone()
if estimator is not None:
self.progress.console.log(
f'{format_pokemon_name(attacker["pokemonId"])} ({format_move_name(attacker_moves["move1"])}/{format_move_name(attacker_moves["move2"])}): {estimator[0]:.2f} (cached)'
)
continue
_ = self.db.execute(
"INSERT INTO estimators(defender, raid_tier, attacker, level, quick_move, charged_move, estimator, party) VALUES(?, ?, ?, ?, ?, ?, ?, ?)",
(
defender,
tier,
attacker["pokemonId"],
level,
attacker_moves["move1"],
attacker_moves["move2"],
attacker_moves["result"]["estimator"],
party,
),
)
self.progress.console.log(
f'{format_pokemon_name(attacker["pokemonId"])} ({format_move_name(attacker_moves["move1"])}/{format_move_name(attacker_moves["move2"])}): {attacker_moves["result"]["estimator"]:.2f}'
)
with self._progress:
total_defenders = sum(len(defenders) for defenders in raid_bosses.values())
task = self._progress.add_task("Simulating raids...", total=total_defenders)
if self._refresh_task is not None:
self.progress.update(self._refresh_task, advance=1)
self.db.commit()
def refresh(self, level: int = 40, party: int = 1) -> None:
with self.progress:
total_defenders = sum(len(defenders) for defenders in self._raid_bosses().values())
self._refresh_task = self.progress.add_task("Working...", total=(total_defenders * 30))
for tier, defenders in self._raid_bosses().items():
for raid_tier, defenders in raid_bosses.items():
if raid_tier == "RAID_LEVEL_MEGA_5":
simplified_raid_tier = "RAID_LEVEL_MEGA"
elif raid_tier == "RAID_LEVEL_ULTRA_BEAST":
simplified_raid_tier = "RAID_LEVEL_5"
else:
simplified_raid_tier = raid_tier
for defender in defenders:
self.progress.update(self._refresh_task, description=f"vs {format_pokemon_name(defender)}...")
try:
self.load_estimators(tier, defender, level, party)
except json.decoder.JSONDecodeError:
time.sleep(30)
self.load_estimators(tier, defender, level, party)
time.sleep(1)
defender_type = self._pokebattler_proxy.pokemon_type(defender)
self._progress.console.log(f"vs {format_pokemon_name(defender, defender_type)}...")
self._progress.update(task, advance=1)
raid = Raid(raid_tier, defender, level, party)
results = {
attacker: [moveset for moveset in movesets if self._pokebattler_proxy.move_type(moveset.charged_move) in self.attacker_types]
for attacker, movesets in self._pokebattler_proxy.simulate(raid).items()
if attacker in attackers
}
if not results:
continue
best_movesets = {
attacker: min(movesets, key=lambda moveset: moveset.estimator)
for attacker, movesets in results.items()
if movesets
}
best_estimator = min(best_movesets.values(), key=lambda moveset: moveset.estimator).estimator
for attacker, moveset in best_movesets.items():
attackers[attacker][simplified_raid_tier].append(moveset.scale(best_estimator))
ase_by_attacker: list[tuple[str, str, str, float]] = []
for attacker, raid_tier_results in attackers.items():
if (
not raid_tier_results["RAID_LEVEL_3"]
or not raid_tier_results["RAID_LEVEL_5"]
or not raid_tier_results["RAID_LEVEL_MEGA"]
):
continue
fast_move = raid_tier_results["RAID_LEVEL_5"][0].fast_move
charged_move = raid_tier_results["RAID_LEVEL_5"][0].charged_move
ase = (
0.15 * self._average_estimator(raid_tier_results["RAID_LEVEL_3"])
+ 0.50 * self._average_estimator(raid_tier_results["RAID_LEVEL_5"])
+ 0.35 * self._average_estimator(raid_tier_results["RAID_LEVEL_MEGA"])
)
ase_by_attacker.append((attacker, fast_move, charged_move, ase))
ase_by_attacker.sort(key=lambda item: item[3])
for attacker, fast_move, charged_move, ase in ase_by_attacker:
attacker_type = self._pokebattler_proxy.pokemon_type(attacker)
fast_move_type = self._pokebattler_proxy.move_type(fast_move)
charged_move_type = self._pokebattler_proxy.move_type(charged_move)
self._progress.console.print(f"[bold]{format_pokemon_name(attacker, attacker_type)}[/bold] ({format_move_name(fast_move, fast_move_type)}/{format_move_name(charged_move, charged_move_type)}): {ase:.2f}")
def _average_estimator(self, movesets: list[Moveset]) -> float:
return sum(moveset.estimator for moveset in movesets) / len(movesets)

View File

@ -15,15 +15,10 @@ def main_cli():
_ = parser.add_argument("type", nargs="+", help="an attacker type")
_ = parser.add_argument("--level", type=int, default=40)
_ = parser.add_argument("--party", type=int, default=1)
_ = parser.add_argument("--refresh", action="store_true", default=False)
args = parser.parse_args()
calculator = pogo_scaled_estimators.calculator.Calculator(args.type)
if args.refresh:
calculator.refresh(level=args.level, party=args.party)
else:
calculator.calculate(level=args.level, party=args.party)
calculator.calculate(level=args.level, party=args.party)
if __name__ == "__main__":

View File

@ -0,0 +1,159 @@
import urllib.parse
from dataclasses import dataclass
from functools import cached_property
from typing import cast, final
import requests_cache
BASE_URL = "https://fight.pokebattler.com"
WEAKNESS = 1.6
DOUBLE_WEAKNESS = WEAKNESS * WEAKNESS
@dataclass(frozen=True)
class Raid:
tier: str
defender: str
level: int = 40
party: int = 1
@dataclass
class Moveset:
fast_move: str
charged_move: str
estimator: float
def scale(self, factor: float):
return Moveset(self.fast_move, self.charged_move, self.estimator / factor)
@final
class PokebattlerProxy:
def __init__(self):
self._cached_session = requests_cache.CachedSession("pokebatter_cache", cache_control=True)
self._pokemon: dict | None = None
self._raids: dict | None = None
self._resists: dict | None = None
@property
def cached_session(self):
return self._cached_session
@cached_property
def moves(self) -> dict:
return self.cached_session.get(f"{BASE_URL}/moves").json()["move"]
@cached_property
def pokemon(self) -> dict:
return self.cached_session.get(f"{BASE_URL}/pokemon").json()["pokemon"]
@cached_property
def raids(self) -> dict:
return self.cached_session.get(f"{BASE_URL}/raids").json()
@cached_property
def resists(self) -> dict:
return self.cached_session.get(f"{BASE_URL}/resists").json()
def simulate(self, raid: Raid) -> dict[str, list[Moveset]]:
query_string = {
"sort": "ESTIMATOR",
"weatherCondition": "NO_WEATHER",
"dodgeStrategy": "DODGE_REACTION_TIME",
"aggregation": "AVERAGE",
"includeLegendary": "true",
"includeShadow": "true",
"includeMegas": "true",
"primalAssistants": "",
"numParty": str(raid.party),
}
if raid.tier != "RAID_LEVEL_3":
query_string["friendshipLevel"] = "FRIENDSHIP_LEVEL_4"
url = f"{BASE_URL}/raids/defenders/{raid.defender}/levels/{raid.tier}/attackers/levels/{raid.level}/strategies/CINEMATIC_ATTACK_WHEN_POSSIBLE/DEFENSE_RANDOM_MC?{urllib.parse.urlencode(query_string, doseq=True)}"
response = self._cached_session.get(url)
results: dict[str, list[Moveset]] = {}
for attacker in response.json()["attackers"][0]["randomMove"]["defenders"]:
results[attacker["pokemonId"]] = [
Moveset(
attacker_moves["move1"], attacker_moves["move2"], cast(float, attacker_moves["result"]["estimator"])
)
for attacker_moves in attacker["byMove"]
]
return results
def raid_bosses(self, attacker_types: list[str]) -> dict:
raid_tiers = []
raid_bosses = {}
for raid_level in ["3", "5", "MEGA", "MEGA_5", "ULTRA_BEAST"]:
tier = f"RAID_LEVEL_{raid_level}"
raid_tiers.extend(
[
tier,
f"{tier}_LEGACY",
f"{tier}_FUTURE",
]
)
raid_bosses[tier] = []
for tier in filter(lambda tier: tier["tier"] in raid_tiers, self.raids["tiers"]):
for boss in (raid["pokemon"] for raid in tier["raids"]):
if boss.endswith("_FORM"):
continue
boss_pokemon = next(filter(lambda mon: mon["pokemonId"] == boss, self.pokemon))
if ("candyToEvolve" in boss_pokemon or boss in ["SEADRA", "SEALEO"]) and boss not in [
"KELDEO",
"LUMINEON",
"MANAPHY",
"PHIONE",
"STUNFISK",
"TERRAKION",
]:
continue
boss_types = (
boss_pokemon["type"],
boss_pokemon.get("type2", "POKEMON_TYPE_NONE"),
)
if any(self._is_weak(attacker_type, boss_types) for attacker_type in attacker_types):
raid_bosses[tier["info"]["guessTier"]].append(boss)
return raid_bosses
def _is_weak(self, attacker_type: str, defender_types: tuple[str, str]) -> bool:
pokemon_types = list(self.resists.keys())
defender_type_indices = (
pokemon_types.index(defender_types[0]),
pokemon_types.index(defender_types[1]),
)
attack_resist = (
self.resists[attacker_type][defender_type_indices[0]]
* self.resists[attacker_type][defender_type_indices[1]]
)
# Check for double weakness.
if attack_resist >= DOUBLE_WEAKNESS:
return True
# Checkout for single weakness if not double weak to anything else.
any_double_weaknesses = any(
r[defender_type_indices[0]] * r[defender_type_indices[1]] >= DOUBLE_WEAKNESS for r in self.resists.values()
)
if not any_double_weaknesses and attack_resist >= WEAKNESS:
return True
return False
def with_charged_moves(self, attacker_types: list[str]) -> list[str]:
charged_moves = [move["moveId"] for move in self.moves if "moveId" in move and "type" in move and move["type"] in attacker_types]
return [
mon["pokemonId"]
for mon in self.pokemon
if any(moveset["cinematicMove"] in charged_moves for moveset in mon["movesets"])
]
def pokemon_type(self, name: str) -> str:
return next(filter(lambda mon: mon["pokemonId"] == name, self.pokemon))["type"]
def move_type(self, name: str) -> str:
return next(filter(lambda move: move["moveId"] == name, self.moves))["type"]

View File

@ -1,7 +1,28 @@
MINIMUM_SPECIAL_NAME_PARTS = 2
def format_pokemon_name(name):
POKEMON_TYPE_COLORS = {
"POKEMON_TYPE_BUG": "green_yellow",
"POKEMON_TYPE_DARK": "bright_black",
"POKEMON_TYPE_DRAGON": "dodger_blue2",
"POKEMON_TYPE_ELECTRIC": "yellow1",
"POKEMON_TYPE_FAIRY": "orchid1",
"POKEMON_TYPE_FIGHTING": "red3",
"POKEMON_TYPE_FIRE": "orange1",
"POKEMON_TYPE_FLYING": "light_sky_blue1",
"POKEMON_TYPE_GHOST": "slate_blue3",
"POKEMON_TYPE_GRASS": "green3",
"POKEMON_TYPE_GROUND": "orange4",
"POKEMON_TYPE_ICE": "pale_turquoise1",
"POKEMON_TYPE_NORMAL": "grey53",
"POKEMON_TYPE_POISON": "dark_magenta",
"POKEMON_TYPE_PSYCHIC": "hot_pink",
"POKEMON_TYPE_ROCK": "gold3",
"POKEMON_TYPE_STEEL": "steel_blue",
"POKEMON_TYPE_WATER": "cornflower_blue",
}
def format_pokemon_name(name: str, pokemon_type: str | None = None):
parts = [part.capitalize() for part in name.split("_")]
if parts[-1] == "Mega" or parts[-1] == "Primal":
parts = [parts[-1]] + parts[:-1]
@ -9,11 +30,17 @@ def format_pokemon_name(name):
parts = [parts[-2]] + parts[:-2] + [parts[-1]]
if len(parts) > MINIMUM_SPECIAL_NAME_PARTS and parts[-2] == "Shadow":
parts = [parts[-2]] + parts[:-2]
return " ".join(parts)
formatted_name = " ".join(parts)
if pokemon_type:
return f"[{POKEMON_TYPE_COLORS[pokemon_type]}]{formatted_name}[/]"
return formatted_name
def format_move_name(name):
def format_move_name(name, move_type: str | None = None):
parts = [part.capitalize() for part in name.split("_")]
if parts[-1] == "Fast":
parts = parts[:-1]
return " ".join(parts)
formatted_name = " ".join(parts)
if move_type:
return f"[{POKEMON_TYPE_COLORS[move_type]}]{formatted_name}[/]"
return formatted_name