import asyncio import os import pkgutil import subprocess from typing import Any import websockets import Utils import settings from BaseClasses import ItemClassification from CommonClient import CommonContext, server_loop, gui_enabled, logger, get_base_parser, handle_url_arg from NetUtils import Endpoint, decode, encode from Utils import async_start from . import Lingo2World from .tracker import Tracker ALL_LETTERS = "abcdefghijklmnopqrstuvwxyz" MESSAGE_MAX_SIZE = 16*1024*1024 PORT = 43182 KEY_STORAGE_MAPPING = { "a": (1, 0), "b": (1, 1), "c": (1, 2), "d": (1, 3), "e": (1, 4), "f": (1, 5), "g": (1, 6), "h": (1, 7), "i": (1, 8), "j": (1, 9), "k": (1, 10), "l": (1, 11), "m": (1, 12), "n": (2, 0), "o": (2, 1), "p": (2, 2), "q": (2, 3), "r": (2, 4), "s": (2, 5), "t": (2, 6), "u": (2, 7), "v": (2, 8), "w": (2, 9), "x": (2, 10), "y": (2, 11), "z": (2, 12), } REVERSE_KEY_STORAGE_MAPPING = {t: k for k, t in KEY_STORAGE_MAPPING.items()} class Lingo2Manager: game_ctx: "Lingo2GameContext" client_ctx: "Lingo2ClientContext" tracker: Tracker keyboard: dict[str, int] worldports: set[int] def __init__(self, game_ctx: "Lingo2GameContext", client_ctx: "Lingo2ClientContext"): self.game_ctx = game_ctx self.game_ctx.manager = self self.client_ctx = client_ctx self.client_ctx.manager = self self.tracker = Tracker(self) self.keyboard = {} self.worldports = set() self.reset() def reset(self): for k in ALL_LETTERS: self.keyboard[k] = 0 self.worldports = set() def update_keyboard(self, new_keyboard: dict[str, int]) -> dict[str, int]: ret: dict[str, int] = {} for k, v in new_keyboard.items(): if v > self.keyboard.get(k, 0): self.keyboard[k] = v ret[k] = v if len(ret) > 0: self.tracker.refresh_state() self.game_ctx.send_accessible_locations() return ret def update_worldports(self, new_worldports: set[int]) -> set[int]: ret = new_worldports.difference(self.worldports) self.worldports.update(new_worldports) if len(ret) > 0: self.tracker.refresh_state() self.game_ctx.send_accessible_locations() return ret class Lingo2GameContext: server: Endpoint | None manager: Lingo2Manager def __init__(self): self.server = None def send_connected(self): if self.server is None: return msg = { "cmd": "Connected", "user": self.manager.client_ctx.username, "seed_name": self.manager.client_ctx.seed_name, "version": self.manager.client_ctx.server_version, "generator_version": self.manager.client_ctx.generator_version, "team": self.manager.client_ctx.team, "slot": self.manager.client_ctx.slot, "checked_locations": self.manager.client_ctx.checked_locations, "slot_data": self.manager.client_ctx.slot_data, } async_start(self.send_msgs([msg]), name="game Connected") def send_item_sent_notification(self, item_name, receiver_name, item_flags): if self.server is None: return msg = { "cmd": "ItemSentNotif", "item_name": item_name, "receiver_name": receiver_name, "item_flags": item_flags, } async_start(self.send_msgs([msg]), name="item sent notif") def send_hint_received(self, item_name, location_name, receiver_name, item_flags, for_self): if self.server is None: return msg = { "cmd": "HintReceived", "item_name": item_name, "location_name": location_name, "receiver_name": receiver_name, "item_flags": item_flags, "self": int(for_self), } async_start(self.send_msgs([msg]), name="hint received notif") def send_item_received(self, items): if self.server is None: return msg = { "cmd": "ItemReceived", "items": items, } async_start(self.send_msgs([msg]), name="item received") def send_location_info(self, locations): if self.server is None: return msg = { "cmd": "LocationInfo", "locations": locations, } async_start(self.send_msgs([msg]), name="location info") def send_text_message(self, parts): if self.server is None: return msg = { "cmd": "TextMessage", "data": parts, } async_start(self.send_msgs([msg]), name="notif") def send_accessible_locations(self): if self.server is None: return msg = { "cmd": "AccessibleLocations", "locations": list(self.manager.tracker.accessible_locations), } if len(self.manager.tracker.accessible_worldports) > 0: msg["worldports"] = list(self.manager.tracker.accessible_worldports) async_start(self.send_msgs([msg]), name="accessible locations") def send_update_locations(self, locations): if self.server is None: return msg = { "cmd": "UpdateLocations", "locations": locations, } async_start(self.send_msgs([msg]), name="update locations") def send_update_keyboard(self, updates): if self.server is None: return msg = { "cmd": "UpdateKeyboard", "updates": updates, } async_start(self.send_msgs([msg]), name="update keyboard") def send_update_worldports(self, worldports): if self.server is None: return msg = { "cmd": "UpdateWorldports", "worldports": worldports, } async_start(self.send_msgs([msg]), name="update worldports") async def send_msgs(self, msgs: list[Any]) -> None: """ `msgs` JSON serializable """ if not self.server or not self.server.socket.open or self.server.socket.closed: return await self.server.socket.send(encode(msgs)) class Lingo2ClientContext(CommonContext): manager: Lingo2Manager game = "Lingo 2" items_handling = 0b111 slot_data: dict[str, Any] | None def __init__(self, server_address: str | None = None, password: str | None = None): super().__init__(server_address, password) def make_gui(self): ui = super().make_gui() ui.base_title = "Archipelago Lingo 2 Client" return ui async def server_auth(self, password_requested: bool = False): self.auth = self.username await self.send_connect() def on_package(self, cmd: str, args: dict): if cmd == "RoomInfo": self.seed_name = args.get("seed_name", None) elif cmd == "Connected": self.slot_data = args.get("slot_data", None) self.manager.reset() self.manager.game_ctx.send_connected() self.manager.tracker.setup_slot(self.slot_data) self.manager.tracker.set_checked_locations(self.checked_locations) self.manager.game_ctx.send_accessible_locations() self.set_notify(self.get_datastorage_key("keyboard1"), self.get_datastorage_key("keyboard2")) msg_batch = [{ "cmd": "Set", "key": self.get_datastorage_key("keyboard1"), "default": 0, "want_reply": True, "operations": [{"operation": "default", "value": 0}] }, { "cmd": "Set", "key": self.get_datastorage_key("keyboard2"), "default": 0, "want_reply": True, "operations": [{"operation": "default", "value": 0}] }] if self.slot_data["shuffle_worldports"]: self.set_notify(self.get_datastorage_key("worldports")) msg_batch.append({ "cmd": "Set", "key": self.get_datastorage_key("worldports"), "default": [], "want_reply": True, "operations": [{"operation": "default", "value": []}] }) async_start(self.send_msgs(msg_batch), name="default keys") elif cmd == "RoomUpdate": self.manager.tracker.set_checked_locations(self.checked_locations) self.manager.game_ctx.send_update_locations(args["checked_locations"]) elif cmd == "ReceivedItems": self.manager.tracker.set_collected_items(self.items_received) cur_index = 0 items = [] for item in args["items"]: index = cur_index + args["index"] cur_index += 1 item_msg = { "id": item.item, "index": index, "flags": item.flags, "text": self.item_names.lookup_in_slot(item.item, self.slot), } if item.player != self.slot: item_msg["sender"] = self.player_names.get(item.player) items.append(item_msg) self.manager.game_ctx.send_item_received(items) if any(ItemClassification.progression in ItemClassification(item.flags) for item in args["items"]): self.manager.game_ctx.send_accessible_locations() elif cmd == "PrintJSON": if "receiving" in args and "item" in args and args["item"].player == self.slot: item_name = self.item_names.lookup_in_slot(args["item"].item, args["receiving"]) location_name = self.location_names.lookup_in_slot(args["item"].location, args["item"].player) receiver_name = self.player_names.get(args["receiving"]) if args["type"] == "Hint" and not args.get("found", False): self.manager.game_ctx.send_hint_received(item_name, location_name, receiver_name, args["item"].flags, int(args["receiving"]) == self.slot) elif args["receiving"] != self.slot: self.manager.game_ctx.send_item_sent_notification(item_name, receiver_name, args["item"].flags) parts = [] for message_part in args["data"]: if "type" not in message_part and "text" in message_part: parts.append({"type": "text", "text": message_part["text"]}) elif message_part["type"] == "player_id": parts.append({ "type": "player", "text": self.player_names.get(int(message_part["text"])), "self": int(int(message_part["text"]) == self.slot), }) elif message_part["type"] == "item_id": parts.append({ "type": "item", "text": self.item_names.lookup_in_slot(int(message_part["text"]), message_part["player"]), "flags": message_part["flags"], }) elif message_part["type"] == "location_id": parts.append({ "type": "location", "text": self.location_names.lookup_in_slot(int(message_part["text"]), message_part["player"]) }) elif "text" in message_part: parts.append({"type": "text", "text": message_part["text"]}) self.manager.game_ctx.send_text_message(parts) elif cmd == "LocationInfo": locations = [] for location in args["locations"]: locations.append({ "id": location.location, "item": self.item_names.lookup_in_slot(location.item, location.player), "player": self.player_names.get(location.player), "flags": location.flags, "self": int(location.player) == self.slot, }) self.manager.game_ctx.send_location_info(locations) elif cmd == "SetReply": if args["key"] == self.get_datastorage_key("keyboard1"): self.handle_keyboard_update(1, args) elif args["key"] == self.get_datastorage_key("keyboard2"): self.handle_keyboard_update(2, args) elif args["key"] == self.get_datastorage_key("worldports"): updates = self.manager.update_worldports(set(args["value"])) if len(updates) > 0: self.manager.game_ctx.send_update_worldports(updates) def get_datastorage_key(self, name: str): return f"Lingo2_{self.slot}_{name}" async def update_keyboard(self, updates: dict[str, int]): kb1 = 0 kb2 = 0 for k, v in updates.items(): if v == 0: continue effect = 0 if v >= 1: effect |= 1 if v == 2: effect |= 2 pos = KEY_STORAGE_MAPPING[k] if pos[0] == 1: kb1 |= (effect << pos[1] * 2) else: kb2 |= (effect << pos[1] * 2) msgs = [] if kb1 != 0: msgs.append({ "cmd": "Set", "key": self.get_datastorage_key("keyboard1"), "want_reply": True, "operations": [{ "operation": "or", "value": kb1 }] }) if kb2 != 0: msgs.append({ "cmd": "Set", "key": self.get_datastorage_key("keyboard2"), "want_reply": True, "operations": [{ "operation": "or", "value": kb2 }] }) if len(msgs) > 0: await self.send_msgs(msgs) def handle_keyboard_update(self, field: int, args: dict[str, Any]): keys = {} value = args["value"] for i in range(0, 13): if (value & (1 << (i * 2))) != 0: keys[REVERSE_KEY_STORAGE_MAPPING[(field, i)]] = 1 if (value & (1 << (i * 2 + 1))) != 0: keys[REVERSE_KEY_STORAGE_MAPPING[(field, i)]] = 2 updates = self.manager.update_keyboard(keys) if len(updates) > 0: self.manager.game_ctx.send_update_keyboard(updates) async def update_worldports(self, updates: set[int]): await self.send_msgs([{ "cmd": "Set", "key": self.get_datastorage_key("worldports"), "want_reply": True, "operations": [{ "operation": "update", "value": updates }] }]) async def pipe_loop(manager: Lingo2Manager): while not manager.client_ctx.exit_event.is_set(): try: socket = await websockets.connect("ws://localhost", port=PORT, ping_timeout=None, ping_interval=None, max_size=MESSAGE_MAX_SIZE) manager.game_ctx.server = Endpoint(socket) logger.info("Connected to Lingo 2!") if manager.client_ctx.auth is not None: manager.game_ctx.send_connected() manager.game_ctx.send_accessible_locations() async for data in manager.game_ctx.server.socket: for msg in decode(data): await process_game_cmd(manager, msg) except ConnectionRefusedError: logger.info("Could not connect to Lingo 2.") finally: manager.game_ctx.server = None async def process_game_cmd(manager: Lingo2Manager, args: dict): cmd = args["cmd"] if cmd == "Connect": server = args.get("server") player = args.get("player") password = args.get("password") if password != "": server_address = f"{player}:{password}@{server}" else: server_address = f"{player}:None@{server}" async_start(manager.client_ctx.connect(server_address), name="client connect") elif cmd == "Disconnect": async_start(manager.client_ctx.disconnect(), name="client disconnect") elif cmd in ["Sync", "LocationChecks", "Say", "StatusUpdate", "LocationScouts"]: async_start(manager.client_ctx.send_msgs([args]), name="client forward") elif cmd == "UpdateKeyboard": updates = manager.update_keyboard(args["keyboard"]) if len(updates) > 0: async_start(manager.client_ctx.update_keyboard(updates), name="client update keyboard") elif cmd == "CheckWorldport": port_id = args["port_id"] worldports = {port_id} if str(port_id) in manager.client_ctx.slot_data["port_pairings"]: worldports.add(manager.client_ctx.slot_data["port_pairings"][str(port_id)]) updates = manager.update_worldports(worldports) if len(updates) > 0: async_start(manager.client_ctx.update_worldports(updates), name="client update worldports") elif cmd == "Quit": manager.client_ctx.exit_event.set() async def run_game(): exe_file = settings.get_settings().lingo2_options.exe_file # This ensures we can use Steam features without having to open the game # through steam. steam_appid_path = os.path.join(os.path.dirname(exe_file), "steam_appid.txt") with open(steam_appid_path, "w") as said_handle: said_handle.write("2523310") if Lingo2World.zip_path is not None: # This is a packaged apworld. init_scene = pkgutil.get_data(__name__, "client/run_from_apworld.tscn") init_path = Utils.local_path("data", "lingo2_init.tscn") with open(init_path, "wb") as file_handle: file_handle.write(init_scene) subprocess.Popen( [ exe_file, "--scene", init_path, "--", str(Lingo2World.zip_path.absolute()), ], cwd=os.path.dirname(exe_file), ) else: # The world is unzipped and being run in source. subprocess.Popen( [ exe_file, "--scene", Utils.local_path("worlds", "lingo2", "client", "run_from_source.tscn"), "--", Utils.local_path("worlds", "lingo2", "client"), ], cwd=os.path.dirname(exe_file), ) def client_main(*launch_args: str) -> None: async def main(args): async_start(run_game()) client_ctx = Lingo2ClientContext(args.connect, args.password) game_ctx = Lingo2GameContext() manager = Lingo2Manager(game_ctx, client_ctx) client_ctx.server_task = asyncio.create_task(server_loop(client_ctx), name="ServerLoop") if gui_enabled: client_ctx.run_gui() client_ctx.run_cli() pipe_task = asyncio.create_task(pipe_loop(manager), name="GameWatcher") try: await pipe_task except Exception as e: logger.exception(e) await client_ctx.exit_event.wait() client_ctx.ui.stop() await client_ctx.shutdown() Utils.init_logging("Lingo2Client", exception_logger="Client") import colorama parser = get_base_parser(description="Lingo 2 Archipelago Client") parser.add_argument('--name', default=None, help="Slot Name to connect as.") parser.add_argument("url", nargs="?", help="Archipelago connection url") args = parser.parse_args(launch_args) args = handle_url_arg(args, parser=parser) colorama.just_fix_windows_console() asyncio.run(main(args)) colorama.deinit()