Compare commits

..

No commits in common. "c67a69b3d02a0a23d94d2470ea3cbe15f70eb877" and "5ff176e1e2ef22070a2e1e2201589490d4229067" have entirely different histories.

6 changed files with 233 additions and 254 deletions

3
.gitignore vendored
View File

@ -1,3 +1,4 @@
__pycache__/
dist/
*.sqlite
*.db
*.json

View File

@ -7,7 +7,6 @@ name = "pogo-scaled-estimators"
version = "1.0a1"
dependencies = [
"requests",
"requests-cache",
"rich",
]
requires-python = ">=3.12"

View File

@ -4,82 +4,242 @@
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
import contextlib
import functools
import json
import operator
import sqlite3
import time
import urllib.parse
from pathlib import Path
from typing import final
from rich.progress import Progress
import requests
from rich.progress import Progress, TaskID
from pogo_scaled_estimators.pokebattler_proxy import Moveset, PokebattlerProxy, Raid
from pogo_scaled_estimators.utilities import format_move_name, format_pokemon_name
WEAKNESS = 1.6
DOUBLE_WEAKNESS = WEAKNESS * WEAKNESS
@final
class Calculator:
def __init__(self, attacker_types: list[str]) -> None:
self.db = sqlite3.connect("ase.db")
# db.set_trace_callback(print)
with contextlib.suppress(sqlite3.OperationalError):
_ = self.db.execute(
"CREATE TABLE estimators(defender, raid_tier, attacker, level, quick_move, charged_move, estimator, party)"
)
self.raids = self._pokebattler_resource("raids")
self.pokemon = self._pokebattler_resource("pokemon")
self.resists = self._pokebattler_resource("resists")
self.moves = self._pokebattler_resource("moves")
self.attacker_types = attacker_types
self._pokebattler_proxy = PokebattlerProxy()
self._progress = Progress()
self.progress = Progress()
self._refresh_task: TaskID | None = None
def _pokebattler_resource(self, name: str) -> dict:
p = Path(f"./{name}.json")
if not p.exists():
response = requests.get(f"https://fight.pokebattler.com/{name}")
with p.open(mode="wb") as fp:
_ = fp.write(response.content)
with p.open() as fp:
return json.load(fp)
def calculate(self, level: int = 40, party: int = 1) -> None:
raid_bosses = self._pokebattler_proxy.raid_bosses(self.attacker_types)
attackers = {
attacker: {"RAID_LEVEL_3": [], "RAID_LEVEL_5": [], "RAID_LEVEL_MEGA": []}
for attacker in self._pokebattler_proxy.with_charged_moves(self.attacker_types)
}
raid_bosses = self._raid_bosses()
charged_moves = [
move["moveId"] for move in self.moves["move"] if "type" in move and move["type"] in self.attacker_types
]
res = self.db.execute(
f"""SELECT DISTINCT(attacker) FROM estimators WHERE charged_move IN ("{'","'.join(charged_moves)}")"""
)
attackers = {row[0]: {"RAID_LEVEL_3": [], "RAID_LEVEL_5": [], "RAID_LEVEL_MEGA": []} for row in res.fetchall()}
movesets = {}
with self._progress:
total_defenders = sum(len(defenders) for defenders in raid_bosses.values())
task = self._progress.add_task("Simulating raids...", total=total_defenders)
for raid_tier, defenders in raid_bosses.items():
defenders = functools.reduce(operator.iconcat, raid_bosses.values(), [])
res = self.db.execute(
f"""
SELECT e.raid_tier, e.defender, e.attacker, e.estimator / m.min_estimator, e.quick_move, e.charged_move
FROM estimators e
INNER JOIN (
SELECT defender, MIN(estimator) AS min_estimator
FROM estimators
WHERE party = ? AND level = 40
AND attacker IN ("{'","'.join(attackers.keys())}")
AND defender IN ("{'","'.join(defenders)}")
GROUP BY defender
) AS m ON e.defender = m.defender
INNER JOIN (
SELECT defender, attacker, MIN(estimator) as min_estimator
FROM estimators
WHERE party = ? AND level = ? AND charged_move IN ("{'","'.join(charged_moves)}")
GROUP BY defender, attacker
) AS ms ON e.defender = ms.defender AND e.attacker = ms.attacker AND e.estimator = ms.min_estimator
WHERE e.attacker IN ("{'","'.join(attackers.keys())}")
""",
(party, party, level),
)
for raid_tier, _, attacker, estimator, fast_move, charged_move in res.fetchall():
if raid_tier == "RAID_LEVEL_MEGA_5":
simplified_raid_tier = "RAID_LEVEL_MEGA"
elif raid_tier == "RAID_LEVEL_ULTRA_BEAST":
simplified_raid_tier = "RAID_LEVEL_5"
else:
simplified_raid_tier = raid_tier
for defender in defenders:
defender_type = self._pokebattler_proxy.pokemon_type(defender)
self._progress.console.log(f"vs {format_pokemon_name(defender, defender_type)}...")
self._progress.update(task, advance=1)
raid = Raid(raid_tier, defender, level, party)
results = {
attacker: [moveset for moveset in movesets if self._pokebattler_proxy.move_type(moveset.charged_move) in self.attacker_types]
for attacker, movesets in self._pokebattler_proxy.simulate(raid).items()
if attacker in attackers
}
if not results:
continue
best_movesets = {
attacker: min(movesets, key=lambda moveset: moveset.estimator)
for attacker, movesets in results.items()
if movesets
}
best_estimator = min(best_movesets.values(), key=lambda moveset: moveset.estimator).estimator
for attacker, moveset in best_movesets.items():
attackers[attacker][simplified_raid_tier].append(moveset.scale(best_estimator))
attackers[attacker][simplified_raid_tier].append(estimator)
movesets[attacker] = (fast_move, charged_move)
ase_by_attacker: list[tuple[str, str, str, float]] = []
for attacker, raid_tier_results in attackers.items():
if (
not raid_tier_results["RAID_LEVEL_3"]
or not raid_tier_results["RAID_LEVEL_5"]
or not raid_tier_results["RAID_LEVEL_MEGA"]
):
ase = {}
for attacker, estimators in attackers.items():
if not estimators["RAID_LEVEL_3"] or not estimators["RAID_LEVEL_5"] or not estimators["RAID_LEVEL_MEGA"]:
continue
fast_move = raid_tier_results["RAID_LEVEL_5"][0].fast_move
charged_move = raid_tier_results["RAID_LEVEL_5"][0].charged_move
ase = (
0.15 * self._average_estimator(raid_tier_results["RAID_LEVEL_3"])
+ 0.50 * self._average_estimator(raid_tier_results["RAID_LEVEL_5"])
+ 0.35 * self._average_estimator(raid_tier_results["RAID_LEVEL_MEGA"])
0.15 * sum(estimators["RAID_LEVEL_3"]) / len(estimators["RAID_LEVEL_3"])
+ 0.50 * sum(estimators["RAID_LEVEL_5"]) / len(estimators["RAID_LEVEL_5"])
+ 0.35 * sum(estimators["RAID_LEVEL_MEGA"]) / len(estimators["RAID_LEVEL_MEGA"])
)
ase_by_attacker.append((attacker, fast_move, charged_move, ase))
ase_by_attacker.sort(key=lambda item: item[3])
for attacker, fast_move, charged_move, ase in ase_by_attacker:
attacker_type = self._pokebattler_proxy.pokemon_type(attacker)
fast_move_type = self._pokebattler_proxy.move_type(fast_move)
charged_move_type = self._pokebattler_proxy.move_type(charged_move)
self._progress.console.print(f"[bold]{format_pokemon_name(attacker, attacker_type)}[/bold] ({format_move_name(fast_move, fast_move_type)}/{format_move_name(charged_move, charged_move_type)}): {ase:.2f}")
fast_move, charged_move = movesets[attacker]
print(f"{attacker},{level},{ase},{fast_move},{charged_move}")
def _average_estimator(self, movesets: list[Moveset]) -> float:
return sum(moveset.estimator for moveset in movesets) / len(movesets)
def _raid_bosses(self) -> dict:
raid_tiers = []
raid_bosses = {}
for raid_level in ["3", "5", "MEGA", "MEGA_5", "ULTRA_BEAST"]:
tier = f"RAID_LEVEL_{raid_level}"
raid_tiers.extend(
[
tier,
f"{tier}_LEGACY",
f"{tier}_FUTURE",
]
)
raid_bosses[tier] = []
for tier in filter(lambda tier: tier["tier"] in raid_tiers, self.raids["tiers"]):
for boss in (raid["pokemon"] for raid in tier["raids"]):
if boss.endswith("_FORM"):
continue
boss_pokemon = next(filter(lambda mon: mon["pokemonId"] == boss, self.pokemon["pokemon"]))
if ("candyToEvolve" in boss_pokemon or boss in ["SEADRA", "SEALEO"]) and boss not in [
"KELDEO",
"LUMINEON",
"MANAPHY",
"PHIONE",
"STUNFISK",
"TERRAKION",
]:
continue
boss_types = (
boss_pokemon["type"],
boss_pokemon.get("type2", "POKEMON_TYPE_NONE"),
)
if any(self._is_weak(attacker_type, boss_types) for attacker_type in self.attacker_types):
raid_bosses[tier["info"]["guessTier"]].append(boss)
return raid_bosses
def _is_weak(self, attacker_type: str, defender_types: tuple[str, str]) -> bool:
pokemon_types = list(self.resists.keys())
defender_type_indices = (
pokemon_types.index(defender_types[0]),
pokemon_types.index(defender_types[1]),
)
attack_resist = (
self.resists[attacker_type][defender_type_indices[0]]
* self.resists[attacker_type][defender_type_indices[1]]
)
# Check for double weakness.
if attack_resist >= DOUBLE_WEAKNESS:
return True
# Checkout for single weakness if not double weak to anything else.
any_double_weaknesses = any(
r[defender_type_indices[0]] * r[defender_type_indices[1]] >= DOUBLE_WEAKNESS for r in self.resists.values()
)
if not any_double_weaknesses and attack_resist >= WEAKNESS:
return True
return False
def load_estimators(self, tier: str, defender: str, level: int, party: int) -> None:
base_url = "https://fight.pokebattler.com"
query_string = {
"sort": "ESTIMATOR",
"weatherCondition": "NO_WEATHER",
"dodgeStrategy": "DODGE_REACTION_TIME",
"aggregation": "AVERAGE",
"includeLegendary": "true",
"includeShadow": "true",
"includeMegas": "true",
"primalAssistants": "",
"numParty": str(party),
}
if tier != "RAID_LEVEL_3":
query_string["friendshipLevel"] = "FRIENDSHIP_LEVEL_4"
url = f"{base_url}/raids/defenders/{defender}/levels/{tier}/attackers/levels/{level}/strategies/CINEMATIC_ATTACK_WHEN_POSSIBLE/DEFENSE_RANDOM_MC?{urllib.parse.urlencode(query_string, doseq=True)}"
response = requests.get(url)
for attacker in response.json()["attackers"][0]["randomMove"]["defenders"]:
for attacker_moves in attacker["byMove"]:
res = self.db.execute(
"SELECT estimator FROM estimators WHERE defender=? AND attacker=? AND level=? AND quick_move=? AND charged_move=? AND party=?",
(
defender,
attacker["pokemonId"],
level,
attacker_moves["move1"],
attacker_moves["move2"],
party,
),
)
estimator = res.fetchone()
if estimator is not None:
self.progress.console.log(
f'{format_pokemon_name(attacker["pokemonId"])} ({format_move_name(attacker_moves["move1"])}/{format_move_name(attacker_moves["move2"])}): {estimator[0]:.2f} (cached)'
)
continue
_ = self.db.execute(
"INSERT INTO estimators(defender, raid_tier, attacker, level, quick_move, charged_move, estimator, party) VALUES(?, ?, ?, ?, ?, ?, ?, ?)",
(
defender,
tier,
attacker["pokemonId"],
level,
attacker_moves["move1"],
attacker_moves["move2"],
attacker_moves["result"]["estimator"],
party,
),
)
self.progress.console.log(
f'{format_pokemon_name(attacker["pokemonId"])} ({format_move_name(attacker_moves["move1"])}/{format_move_name(attacker_moves["move2"])}): {attacker_moves["result"]["estimator"]:.2f}'
)
if self._refresh_task is not None:
self.progress.update(self._refresh_task, advance=1)
self.db.commit()
def refresh(self, level: int = 40, party: int = 1) -> None:
with self.progress:
total_defenders = sum(len(defenders) for defenders in self._raid_bosses().values())
self._refresh_task = self.progress.add_task("Working...", total=(total_defenders * 30))
for tier, defenders in self._raid_bosses().items():
for defender in defenders:
self.progress.update(self._refresh_task, description=f"vs {format_pokemon_name(defender)}...")
try:
self.load_estimators(tier, defender, level, party)
except json.decoder.JSONDecodeError:
time.sleep(30)
self.load_estimators(tier, defender, level, party)
time.sleep(1)

View File

@ -15,9 +15,14 @@ def main_cli():
_ = parser.add_argument("type", nargs="+", help="an attacker type")
_ = parser.add_argument("--level", type=int, default=40)
_ = parser.add_argument("--party", type=int, default=1)
_ = parser.add_argument("--refresh", action="store_true", default=False)
args = parser.parse_args()
calculator = pogo_scaled_estimators.calculator.Calculator(args.type)
if args.refresh:
calculator.refresh(level=args.level, party=args.party)
else:
calculator.calculate(level=args.level, party=args.party)

View File

@ -1,159 +0,0 @@
import urllib.parse
from dataclasses import dataclass
from functools import cached_property
from typing import cast, final
import requests_cache
BASE_URL = "https://fight.pokebattler.com"
WEAKNESS = 1.6
DOUBLE_WEAKNESS = WEAKNESS * WEAKNESS
@dataclass(frozen=True)
class Raid:
tier: str
defender: str
level: int = 40
party: int = 1
@dataclass
class Moveset:
fast_move: str
charged_move: str
estimator: float
def scale(self, factor: float):
return Moveset(self.fast_move, self.charged_move, self.estimator / factor)
@final
class PokebattlerProxy:
def __init__(self):
self._cached_session = requests_cache.CachedSession("pokebatter_cache", cache_control=True)
self._pokemon: dict | None = None
self._raids: dict | None = None
self._resists: dict | None = None
@property
def cached_session(self):
return self._cached_session
@cached_property
def moves(self) -> dict:
return self.cached_session.get(f"{BASE_URL}/moves").json()["move"]
@cached_property
def pokemon(self) -> dict:
return self.cached_session.get(f"{BASE_URL}/pokemon").json()["pokemon"]
@cached_property
def raids(self) -> dict:
return self.cached_session.get(f"{BASE_URL}/raids").json()
@cached_property
def resists(self) -> dict:
return self.cached_session.get(f"{BASE_URL}/resists").json()
def simulate(self, raid: Raid) -> dict[str, list[Moveset]]:
query_string = {
"sort": "ESTIMATOR",
"weatherCondition": "NO_WEATHER",
"dodgeStrategy": "DODGE_REACTION_TIME",
"aggregation": "AVERAGE",
"includeLegendary": "true",
"includeShadow": "true",
"includeMegas": "true",
"primalAssistants": "",
"numParty": str(raid.party),
}
if raid.tier != "RAID_LEVEL_3":
query_string["friendshipLevel"] = "FRIENDSHIP_LEVEL_4"
url = f"{BASE_URL}/raids/defenders/{raid.defender}/levels/{raid.tier}/attackers/levels/{raid.level}/strategies/CINEMATIC_ATTACK_WHEN_POSSIBLE/DEFENSE_RANDOM_MC?{urllib.parse.urlencode(query_string, doseq=True)}"
response = self._cached_session.get(url)
results: dict[str, list[Moveset]] = {}
for attacker in response.json()["attackers"][0]["randomMove"]["defenders"]:
results[attacker["pokemonId"]] = [
Moveset(
attacker_moves["move1"], attacker_moves["move2"], cast(float, attacker_moves["result"]["estimator"])
)
for attacker_moves in attacker["byMove"]
]
return results
def raid_bosses(self, attacker_types: list[str]) -> dict:
raid_tiers = []
raid_bosses = {}
for raid_level in ["3", "5", "MEGA", "MEGA_5", "ULTRA_BEAST"]:
tier = f"RAID_LEVEL_{raid_level}"
raid_tiers.extend(
[
tier,
f"{tier}_LEGACY",
f"{tier}_FUTURE",
]
)
raid_bosses[tier] = []
for tier in filter(lambda tier: tier["tier"] in raid_tiers, self.raids["tiers"]):
for boss in (raid["pokemon"] for raid in tier["raids"]):
if boss.endswith("_FORM"):
continue
boss_pokemon = next(filter(lambda mon: mon["pokemonId"] == boss, self.pokemon))
if ("candyToEvolve" in boss_pokemon or boss in ["SEADRA", "SEALEO"]) and boss not in [
"KELDEO",
"LUMINEON",
"MANAPHY",
"PHIONE",
"STUNFISK",
"TERRAKION",
]:
continue
boss_types = (
boss_pokemon["type"],
boss_pokemon.get("type2", "POKEMON_TYPE_NONE"),
)
if any(self._is_weak(attacker_type, boss_types) for attacker_type in attacker_types):
raid_bosses[tier["info"]["guessTier"]].append(boss)
return raid_bosses
def _is_weak(self, attacker_type: str, defender_types: tuple[str, str]) -> bool:
pokemon_types = list(self.resists.keys())
defender_type_indices = (
pokemon_types.index(defender_types[0]),
pokemon_types.index(defender_types[1]),
)
attack_resist = (
self.resists[attacker_type][defender_type_indices[0]]
* self.resists[attacker_type][defender_type_indices[1]]
)
# Check for double weakness.
if attack_resist >= DOUBLE_WEAKNESS:
return True
# Checkout for single weakness if not double weak to anything else.
any_double_weaknesses = any(
r[defender_type_indices[0]] * r[defender_type_indices[1]] >= DOUBLE_WEAKNESS for r in self.resists.values()
)
if not any_double_weaknesses and attack_resist >= WEAKNESS:
return True
return False
def with_charged_moves(self, attacker_types: list[str]) -> list[str]:
charged_moves = [move["moveId"] for move in self.moves if "moveId" in move and "type" in move and move["type"] in attacker_types]
return [
mon["pokemonId"]
for mon in self.pokemon
if any(moveset["cinematicMove"] in charged_moves for moveset in mon["movesets"])
]
def pokemon_type(self, name: str) -> str:
return next(filter(lambda mon: mon["pokemonId"] == name, self.pokemon))["type"]
def move_type(self, name: str) -> str:
return next(filter(lambda move: move["moveId"] == name, self.moves))["type"]

View File

@ -1,28 +1,7 @@
MINIMUM_SPECIAL_NAME_PARTS = 2
POKEMON_TYPE_COLORS = {
"POKEMON_TYPE_BUG": "green_yellow",
"POKEMON_TYPE_DARK": "bright_black",
"POKEMON_TYPE_DRAGON": "dodger_blue2",
"POKEMON_TYPE_ELECTRIC": "yellow1",
"POKEMON_TYPE_FAIRY": "orchid1",
"POKEMON_TYPE_FIGHTING": "red3",
"POKEMON_TYPE_FIRE": "orange1",
"POKEMON_TYPE_FLYING": "light_sky_blue1",
"POKEMON_TYPE_GHOST": "slate_blue3",
"POKEMON_TYPE_GRASS": "green3",
"POKEMON_TYPE_GROUND": "orange4",
"POKEMON_TYPE_ICE": "pale_turquoise1",
"POKEMON_TYPE_NORMAL": "grey53",
"POKEMON_TYPE_POISON": "dark_magenta",
"POKEMON_TYPE_PSYCHIC": "hot_pink",
"POKEMON_TYPE_ROCK": "gold3",
"POKEMON_TYPE_STEEL": "steel_blue",
"POKEMON_TYPE_WATER": "cornflower_blue",
}
def format_pokemon_name(name: str, pokemon_type: str | None = None):
def format_pokemon_name(name):
parts = [part.capitalize() for part in name.split("_")]
if parts[-1] == "Mega" or parts[-1] == "Primal":
parts = [parts[-1]] + parts[:-1]
@ -30,17 +9,11 @@ def format_pokemon_name(name: str, pokemon_type: str | None = None):
parts = [parts[-2]] + parts[:-2] + [parts[-1]]
if len(parts) > MINIMUM_SPECIAL_NAME_PARTS and parts[-2] == "Shadow":
parts = [parts[-2]] + parts[:-2]
formatted_name = " ".join(parts)
if pokemon_type:
return f"[{POKEMON_TYPE_COLORS[pokemon_type]}]{formatted_name}[/]"
return formatted_name
return " ".join(parts)
def format_move_name(name, move_type: str | None = None):
def format_move_name(name):
parts = [part.capitalize() for part in name.split("_")]
if parts[-1] == "Fast":
parts = parts[:-1]
formatted_name = " ".join(parts)
if move_type:
return f"[{POKEMON_TYPE_COLORS[move_type]}]{formatted_name}[/]"
return formatted_name
return " ".join(parts)