import asyncio import os import pkgutil import subprocess from typing import Any import websockets import Utils import settings from CommonClient import CommonContext, server_loop, gui_enabled, logger, get_base_parser, handle_url_arg from NetUtils import Endpoint, decode, encode from Utils import async_start PORT = 43182 MESSAGE_MAX_SIZE = 16*1024*1024 class Lingo2GameContext: server: Endpoint | None client: "Lingo2ClientContext" def __init__(self): self.server = None def send_connected(self): msg = { "cmd": "Connected", "user": self.client.username, "seed_name": self.client.seed_name, "version": self.client.server_version, "generator_version": self.client.generator_version, "team": self.client.team, "slot": self.client.slot, "checked_locations": self.client.checked_locations, "slot_data": self.client.slot_data, } async_start(self.send_msgs([msg]), name="game Connected") def send_item_sent_notification(self, item_name, receiver_name, item_flags): msg = { "cmd": "ItemSentNotif", "item_name": item_name, "receiver_name": receiver_name, "item_flags": item_flags, } async_start(self.send_msgs([msg]), name="item sent notif") def send_hint_received(self, item_name, location_name, receiver_name, item_flags, for_self): msg = { "cmd": "HintReceived", "item_name": item_name, "location_name": location_name, "receiver_name": receiver_name, "item_flags": item_flags, "self": int(for_self), } async_start(self.send_msgs([msg]), name="hint received notif") def send_item_received(self, items): msg = { "cmd": "ItemReceived", "items": items, } async_start(self.send_msgs([msg]), name="item received") def send_location_info(self, locations): msg = { "cmd": "LocationInfo", "locations": locations, } async_start(self.send_msgs([msg]), name="location info") def send_text_message(self, parts): msg = { "cmd": "TextMessage", "data": parts, } async_start(self.send_msgs([msg]), name="notif") async def send_msgs(self, msgs: list[Any]) -> None: """ `msgs` JSON serializable """ if not self.server or not self.server.socket.open or self.server.socket.closed: return await self.server.socket.send(encode(msgs)) class Lingo2ClientContext(CommonContext): game_ctx: Lingo2GameContext game = "Lingo 2" items_handling = 0b111 slot_data: dict[str, Any] | None def __init__(self, server_address: str | None = None, password: str | None = None): super().__init__(server_address, password) def make_gui(self): ui = super().make_gui() ui.base_title = "Archipelago Lingo 2 Client" return ui async def server_auth(self, password_requested: bool = False): self.auth = self.username await self.send_connect() def on_package(self, cmd: str, args: dict): if cmd == "RoomInfo": self.seed_name = args.get("seed_name", None) elif cmd == "Connected": self.slot_data = args.get("slot_data", None) if self.game_ctx.server is not None: self.game_ctx.send_connected() elif cmd == "ReceivedItems": if self.game_ctx.server is not None: cur_index = 0 items = [] for item in args["items"]: index = cur_index + args["index"] cur_index += 1 item_msg = { "id": item.item, "index": index, "flags": item.flags, "text": self.item_names.lookup_in_slot(item.item, self.slot), } if item.player != self.slot: item_msg["sender"] = self.player_names.get(item.player) items.append(item_msg) self.game_ctx.send_item_received(items) elif cmd == "PrintJSON": if self.game_ctx.server is not None: if "receiving" in args and "item" in args and args["item"].player == self.slot: item_name = self.item_names.lookup_in_slot(args["item"].item, args["receiving"]) location_name = self.location_names.lookup_in_slot(args["item"].location, args["item"].player) receiver_name = self.player_names.get(args["receiving"]) if args["type"] == "Hint" and not args.get("found", False): self.game_ctx.send_hint_received(item_name, location_name, receiver_name, args["item"].flags, int(args["receiving"]) == self.slot) elif args["receiving"] != self.slot: self.game_ctx.send_item_sent_notification(item_name, receiver_name, args["item"].flags) parts = [] for message_part in args["data"]: if "type" not in message_part and "text" in message_part: parts.append({"type": "text", "text": message_part["text"]}) elif message_part["type"] == "player_id": parts.append({ "type": "player", "text": self.player_names.get(int(message_part["text"])), "self": int(int(message_part["text"]) == self.slot), }) elif message_part["type"] == "item_id": parts.append({ "type": "item", "text": self.item_names.lookup_in_slot(int(message_part["text"]), message_part["player"]), "flags": message_part["flags"], }) elif message_part["type"] == "location_id": parts.append({ "type": "location", "text": self.location_names.lookup_in_slot(int(message_part["text"]), message_part["player"]) }) elif "text" in message_part: parts.append({"type": "text", "text": message_part["text"]}) self.game_ctx.send_text_message(parts) elif cmd == "LocationInfo": if self.game_ctx.server is not None: locations = [] for location in args["locations"]: locations.append({ "id": location.location, "item": self.item_names.lookup_in_slot(location.item, location.player), "player": self.player_names.get(location.player), "flags": location.flags, "self": int(location.player) == self.slot, }) self.game_ctx.send_location_info(locations) async def pipe_loop(ctx: Lingo2GameContext): while not ctx.client.exit_event.is_set(): try: socket = await websockets.connect("ws://localhost", port=PORT, ping_timeout=None, ping_interval=None, max_size=MESSAGE_MAX_SIZE) ctx.server = Endpoint(socket) logger.info("Connected to Lingo 2!") if ctx.client.auth is not None: ctx.send_connected() async for data in ctx.server.socket: for msg in decode(data): await process_game_cmd(ctx, msg) except ConnectionRefusedError: logger.info("Could not connect to Lingo 2.") finally: ctx.server = None async def process_game_cmd(ctx: Lingo2GameContext, args: dict): cmd = args["cmd"] if cmd == "Connect": server = args.get("server") player = args.get("player") password = args.get("password") if password != "": server_address = f"{player}:{password}@{server}" else: server_address = f"{player}:None@{server}" async_start(ctx.client.connect(server_address), name="client connect") elif cmd == "Disconnect": async_start(ctx.client.disconnect(), name="client disconnect") elif cmd in ["Sync", "LocationChecks", "Say", "StatusUpdate", "LocationScouts"]: async_start(ctx.client.send_msgs([args]), name="client forward") async def run_game(): exe_file = settings.get_settings().lingo2_options.exe_file from worlds import AutoWorldRegister world = AutoWorldRegister.world_types["Lingo 2"] if world.zip_path is not None: # This is a packaged apworld. init_scene = pkgutil.get_data(__name__, "client/run_from_apworld.tscn") init_path = Utils.local_path("data", "lingo2_init.tscn") with open(init_path, "wb") as file_handle: file_handle.write(init_scene) subprocess.Popen( [ exe_file, "--scene", init_path, "--", str(world.zip_path.absolute()), ], cwd=os.path.dirname(exe_file), ) else: # The world is unzipped and being run in source. subprocess.Popen( [ exe_file, "--scene", Utils.local_path("worlds", "lingo2", "client", "run_from_source.tscn"), "--", Utils.local_path("worlds", "lingo2", "client"), ], cwd=os.path.dirname(exe_file), ) def client_main(*launch_args: str) -> None: async def main(args): async_start(run_game()) client_ctx = Lingo2ClientContext(args.connect, args.password) game_ctx = Lingo2GameContext() client_ctx.game_ctx = game_ctx game_ctx.client = client_ctx client_ctx.server_task = asyncio.create_task(server_loop(client_ctx), name="ServerLoop") if gui_enabled: client_ctx.run_gui() client_ctx.run_cli() pipe_task = asyncio.create_task(pipe_loop(game_ctx), name="GameWatcher") try: await pipe_task except Exception as e: logger.exception(e) await client_ctx.exit_event.wait() await client_ctx.shutdown() Utils.init_logging("Lingo2Client", exception_logger="Client") import colorama parser = get_base_parser(description="Lingo 2 Archipelago Client") parser.add_argument('--name', default=None, help="Slot Name to connect as.") parser.add_argument("url", nargs="?", help="Archipelago connection url") args = parser.parse_args(launch_args) args = handle_url_arg(args, parser=parser) colorama.just_fix_windows_console() asyncio.run(main(args)) colorama.deinit()