From b0f474bee1c8e1111f7542bf4985136d9aedf340 Mon Sep 17 00:00:00 2001 From: Star Rauchenberger Date: Sat, 27 Sep 2025 17:14:40 -0400 Subject: Treat local letters as items for tracker Local letters are now synced with datastorage, so they transfer to other computers like regular items would, and the tracker also now waits until you collect local letters before showing what they give you in logic. --- apworld/context.py | 388 +++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 276 insertions(+), 112 deletions(-) (limited to 'apworld/context.py') diff --git a/apworld/context.py b/apworld/context.py index 0a058e5..bc3b1bf 100644 --- a/apworld/context.py +++ b/apworld/context.py @@ -15,35 +15,85 @@ from Utils import async_start from . import Lingo2World from .tracker import Tracker -PORT = 43182 +ALL_LETTERS = "abcdefghijklmnopqrstuvwxyz" MESSAGE_MAX_SIZE = 16*1024*1024 +PORT = 43182 + +KEY_STORAGE_MAPPING = { + "a": (1, 0), "b": (1, 1), "c": (1, 2), "d": (1, 3), "e": (1, 4), "f": (1, 5), "g": (1, 6), "h": (1, 7), "i": (1, 8), + "j": (1, 9), "k": (1, 10), "l": (1, 11), "m": (1, 12), "n": (2, 0), "o": (2, 1), "p": (2, 2), "q": (2, 3), + "r": (2, 4), "s": (2, 5), "t": (2, 6), "u": (2, 7), "v": (2, 8), "w": (2, 9), "x": (2, 10), "y": (2, 11), + "z": (2, 12), +} + +REVERSE_KEY_STORAGE_MAPPING = {t: k for k, t in KEY_STORAGE_MAPPING.items()} + + +class Lingo2Manager: + game_ctx: "Lingo2GameContext" + client_ctx: "Lingo2ClientContext" + tracker: Tracker + + keyboard: dict[str, int] + + def __init__(self, game_ctx: "Lingo2GameContext", client_ctx: "Lingo2ClientContext"): + self.game_ctx = game_ctx + self.game_ctx.manager = self + self.client_ctx = client_ctx + self.client_ctx.manager = self + self.tracker = Tracker(self) + self.keyboard = {} + + self.reset() + + def reset(self): + for k in ALL_LETTERS: + self.keyboard[k] = 0 + + def update_keyboard(self, new_keyboard: dict[str, int]) -> dict[str, int]: + ret: dict[str, int] = {} + + for k, v in new_keyboard.items(): + if v > self.keyboard.get(k, 0): + self.keyboard[k] = v + ret[k] = v + + if len(ret) > 0: + self.tracker.refresh_state() + self.game_ctx.send_accessible_locations() + + return ret class Lingo2GameContext: server: Endpoint | None - client: "Lingo2ClientContext" - tracker: Tracker + manager: Lingo2Manager def __init__(self): self.server = None - self.tracker = Tracker() def send_connected(self): + if self.server is None: + return + msg = { "cmd": "Connected", - "user": self.client.username, - "seed_name": self.client.seed_name, - "version": self.client.server_version, - "generator_version": self.client.generator_version, - "team": self.client.team, - "slot": self.client.slot, - "checked_locations": self.client.checked_locations, - "slot_data": self.client.slot_data, + "user": self.manager.client_ctx.username, + "seed_name": self.manager.client_ctx.seed_name, + "version": self.manager.client_ctx.server_version, + "generator_version": self.manager.client_ctx.generator_version, + "team": self.manager.client_ctx.team, + "slot": self.manager.client_ctx.slot, + "checked_locations": self.manager.client_ctx.checked_locations, + "slot_data": self.manager.client_ctx.slot_data, } async_start(self.send_msgs([msg]), name="game Connected") def send_item_sent_notification(self, item_name, receiver_name, item_flags): + if self.server is None: + return + msg = { "cmd": "ItemSentNotif", "item_name": item_name, @@ -54,6 +104,9 @@ class Lingo2GameContext: async_start(self.send_msgs([msg]), name="item sent notif") def send_hint_received(self, item_name, location_name, receiver_name, item_flags, for_self): + if self.server is None: + return + msg = { "cmd": "HintReceived", "item_name": item_name, @@ -66,6 +119,9 @@ class Lingo2GameContext: async_start(self.send_msgs([msg]), name="hint received notif") def send_item_received(self, items): + if self.server is None: + return + msg = { "cmd": "ItemReceived", "items": items, @@ -74,6 +130,9 @@ class Lingo2GameContext: async_start(self.send_msgs([msg]), name="item received") def send_location_info(self, locations): + if self.server is None: + return + msg = { "cmd": "LocationInfo", "locations": locations, @@ -82,6 +141,9 @@ class Lingo2GameContext: async_start(self.send_msgs([msg]), name="location info") def send_text_message(self, parts): + if self.server is None: + return + msg = { "cmd": "TextMessage", "data": parts, @@ -90,14 +152,20 @@ class Lingo2GameContext: async_start(self.send_msgs([msg]), name="notif") def send_accessible_locations(self): + if self.server is None: + return + msg = { "cmd": "AccessibleLocations", - "locations": list(self.tracker.accessible_locations), + "locations": list(self.manager.tracker.accessible_locations), } async_start(self.send_msgs([msg]), name="accessible locations") def send_update_locations(self, locations): + if self.server is None: + return + msg = { "cmd": "UpdateLocations", "locations": locations, @@ -105,6 +173,17 @@ class Lingo2GameContext: async_start(self.send_msgs([msg]), name="update locations") + def send_update_keyboard(self, updates): + if self.server is None: + return + + msg = { + "cmd": "UpdateKeyboard", + "updates": updates, + } + + async_start(self.send_msgs([msg]), name="update keyboard") + async def send_msgs(self, msgs: list[Any]) -> None: """ `msgs` JSON serializable """ if not self.server or not self.server.socket.open or self.server.socket.closed: @@ -113,7 +192,7 @@ class Lingo2GameContext: class Lingo2ClientContext(CommonContext): - game_ctx: Lingo2GameContext + manager: Lingo2Manager game = "Lingo 2" items_handling = 0b111 @@ -138,118 +217,201 @@ class Lingo2ClientContext(CommonContext): elif cmd == "Connected": self.slot_data = args.get("slot_data", None) - if self.game_ctx.server is not None: - self.game_ctx.send_connected() - - self.game_ctx.tracker.setup_slot(self.slot_data) + self.manager.reset() + + self.manager.game_ctx.send_connected() + + self.manager.tracker.setup_slot(self.slot_data) + self.manager.tracker.set_checked_locations(self.checked_locations) + self.manager.game_ctx.send_accessible_locations() + + self.set_notify(self.get_datastorage_key("keyboard1"), self.get_datastorage_key("keyboard2")) + async_start(self.send_msgs([{ + "cmd": "Set", + "key": self.get_datastorage_key("keyboard1"), + "default": 0, + "want_reply": True, + "operations": [{"operation": "default", "value": 0}] + }, { + "cmd": "Set", + "key": self.get_datastorage_key("keyboard2"), + "default": 0, + "want_reply": True, + "operations": [{"operation": "default", "value": 0}] + }]), name="default keys") elif cmd == "RoomUpdate": - if self.game_ctx.server is not None: - self.game_ctx.send_update_locations(args["checked_locations"]) + self.manager.tracker.set_checked_locations(self.checked_locations) + self.manager.game_ctx.send_update_locations(args["checked_locations"]) elif cmd == "ReceivedItems": - self.game_ctx.tracker.set_collected_items(self.items_received) + self.manager.tracker.set_collected_items(self.items_received) - if self.game_ctx.server is not None: - cur_index = 0 - items = [] + cur_index = 0 + items = [] - for item in args["items"]: - index = cur_index + args["index"] - cur_index += 1 + for item in args["items"]: + index = cur_index + args["index"] + cur_index += 1 - item_msg = { - "id": item.item, - "index": index, - "flags": item.flags, - "text": self.item_names.lookup_in_slot(item.item, self.slot), - } + item_msg = { + "id": item.item, + "index": index, + "flags": item.flags, + "text": self.item_names.lookup_in_slot(item.item, self.slot), + } - if item.player != self.slot: - item_msg["sender"] = self.player_names.get(item.player) + if item.player != self.slot: + item_msg["sender"] = self.player_names.get(item.player) - items.append(item_msg) + items.append(item_msg) - self.game_ctx.send_item_received(items) + self.manager.game_ctx.send_item_received(items) - if any(ItemClassification.progression in ItemClassification(item.flags) for item in args["items"]): - self.game_ctx.send_accessible_locations() + if any(ItemClassification.progression in ItemClassification(item.flags) for item in args["items"]): + self.manager.game_ctx.send_accessible_locations() elif cmd == "PrintJSON": - if self.game_ctx.server is not None: - if "receiving" in args and "item" in args and args["item"].player == self.slot: - item_name = self.item_names.lookup_in_slot(args["item"].item, args["receiving"]) - location_name = self.location_names.lookup_in_slot(args["item"].location, args["item"].player) - receiver_name = self.player_names.get(args["receiving"]) - - if args["type"] == "Hint" and not args.get("found", False): - self.game_ctx.send_hint_received(item_name, location_name, receiver_name, args["item"].flags, - int(args["receiving"]) == self.slot) - elif args["receiving"] != self.slot: - self.game_ctx.send_item_sent_notification(item_name, receiver_name, args["item"].flags) - - parts = [] - for message_part in args["data"]: - if "type" not in message_part and "text" in message_part: - parts.append({"type": "text", "text": message_part["text"]}) - elif message_part["type"] == "player_id": - parts.append({ - "type": "player", - "text": self.player_names.get(int(message_part["text"])), - "self": int(int(message_part["text"]) == self.slot), - }) - elif message_part["type"] == "item_id": - parts.append({ - "type": "item", - "text": self.item_names.lookup_in_slot(int(message_part["text"]), message_part["player"]), - "flags": message_part["flags"], - }) - elif message_part["type"] == "location_id": - parts.append({ - "type": "location", - "text": self.location_names.lookup_in_slot(int(message_part["text"]), - message_part["player"]) - }) - elif "text" in message_part: - parts.append({"type": "text", "text": message_part["text"]}) - - self.game_ctx.send_text_message(parts) - elif cmd == "LocationInfo": - if self.game_ctx.server is not None: - locations = [] - - for location in args["locations"]: - locations.append({ - "id": location.location, - "item": self.item_names.lookup_in_slot(location.item, location.player), - "player": self.player_names.get(location.player), - "flags": location.flags, - "self": int(location.player) == self.slot, + if "receiving" in args and "item" in args and args["item"].player == self.slot: + item_name = self.item_names.lookup_in_slot(args["item"].item, args["receiving"]) + location_name = self.location_names.lookup_in_slot(args["item"].location, args["item"].player) + receiver_name = self.player_names.get(args["receiving"]) + + if args["type"] == "Hint" and not args.get("found", False): + self.manager.game_ctx.send_hint_received(item_name, location_name, receiver_name, args["item"].flags, + int(args["receiving"]) == self.slot) + elif args["receiving"] != self.slot: + self.manager.game_ctx.send_item_sent_notification(item_name, receiver_name, args["item"].flags) + + parts = [] + for message_part in args["data"]: + if "type" not in message_part and "text" in message_part: + parts.append({"type": "text", "text": message_part["text"]}) + elif message_part["type"] == "player_id": + parts.append({ + "type": "player", + "text": self.player_names.get(int(message_part["text"])), + "self": int(int(message_part["text"]) == self.slot), }) + elif message_part["type"] == "item_id": + parts.append({ + "type": "item", + "text": self.item_names.lookup_in_slot(int(message_part["text"]), message_part["player"]), + "flags": message_part["flags"], + }) + elif message_part["type"] == "location_id": + parts.append({ + "type": "location", + "text": self.location_names.lookup_in_slot(int(message_part["text"]), + message_part["player"]) + }) + elif "text" in message_part: + parts.append({"type": "text", "text": message_part["text"]}) - self.game_ctx.send_location_info(locations) - - if cmd in ["Connected", "RoomUpdate"]: - self.game_ctx.tracker.set_checked_locations(self.checked_locations) - - -async def pipe_loop(ctx: Lingo2GameContext): - while not ctx.client.exit_event.is_set(): + self.manager.game_ctx.send_text_message(parts) + elif cmd == "LocationInfo": + locations = [] + + for location in args["locations"]: + locations.append({ + "id": location.location, + "item": self.item_names.lookup_in_slot(location.item, location.player), + "player": self.player_names.get(location.player), + "flags": location.flags, + "self": int(location.player) == self.slot, + }) + + self.manager.game_ctx.send_location_info(locations) + elif cmd == "SetReply": + if args["key"] == self.get_datastorage_key("keyboard1"): + self.handle_keyboard_update(1, args) + elif args["key"] == self.get_datastorage_key("keyboard2"): + self.handle_keyboard_update(2, args) + + def get_datastorage_key(self, name: str): + return f"Lingo2_{self.slot}_{name}" + + async def update_keyboard(self, updates: dict[str, int]): + kb1 = 0 + kb2 = 0 + + for k, v in updates.items(): + if v == 0: + continue + + effect = 0 + if v >= 1: + effect |= 1 + if v == 2: + effect |= 2 + + pos = KEY_STORAGE_MAPPING[k] + if pos[0] == 1: + kb1 |= (effect << pos[1] * 2) + else: + kb2 |= (effect << pos[1] * 2) + + msgs = [] + + if kb1 != 0: + msgs.append({ + "cmd": "Set", + "key": self.get_datastorage_key("keyboard1"), + "want_reply": True, + "operations": [{ + "operation": "or", + "value": kb1 + }] + }) + + if kb2 != 0: + msgs.append({ + "cmd": "Set", + "key": self.get_datastorage_key("keyboard2"), + "want_reply": True, + "operations": [{ + "operation": "or", + "value": kb2 + }] + }) + + if len(msgs) > 0: + print(updates) + print(msgs) + await self.send_msgs(msgs) + + def handle_keyboard_update(self, field: int, args: dict[str, Any]): + keys = {} + value = args["value"] + + for i in range(0, 13): + if (value & (1 << (i * 2))) != 0: + keys[REVERSE_KEY_STORAGE_MAPPING[(field, i)]] = 1 + if (value & (1 << (i * 2 + 1))) != 0: + keys[REVERSE_KEY_STORAGE_MAPPING[(field, i)]] = 2 + + updates = self.manager.update_keyboard(keys) + if len(updates) > 0: + self.manager.game_ctx.send_update_keyboard(updates) + + +async def pipe_loop(manager: Lingo2Manager): + while not manager.client_ctx.exit_event.is_set(): try: socket = await websockets.connect("ws://localhost", port=PORT, ping_timeout=None, ping_interval=None, max_size=MESSAGE_MAX_SIZE) - ctx.server = Endpoint(socket) + manager.game_ctx.server = Endpoint(socket) logger.info("Connected to Lingo 2!") - if ctx.client.auth is not None: - ctx.send_connected() - ctx.send_accessible_locations() - async for data in ctx.server.socket: + if manager.client_ctx.auth is not None: + manager.game_ctx.send_connected() + manager.game_ctx.send_accessible_locations() + async for data in manager.game_ctx.server.socket: for msg in decode(data): - await process_game_cmd(ctx, msg) + await process_game_cmd(manager, msg) except ConnectionRefusedError: logger.info("Could not connect to Lingo 2.") finally: - ctx.server = None + manager.game_ctx.server = None -async def process_game_cmd(ctx: Lingo2GameContext, args: dict): +async def process_game_cmd(manager: Lingo2Manager, args: dict): cmd = args["cmd"] if cmd == "Connect": @@ -262,13 +424,17 @@ async def process_game_cmd(ctx: Lingo2GameContext, args: dict): else: server_address = f"{player}:None@{server}" - async_start(ctx.client.connect(server_address), name="client connect") + async_start(manager.client_ctx.connect(server_address), name="client connect") elif cmd == "Disconnect": - async_start(ctx.client.disconnect(), name="client disconnect") + async_start(manager.client_ctx.disconnect(), name="client disconnect") elif cmd in ["Sync", "LocationChecks", "Say", "StatusUpdate", "LocationScouts"]: - async_start(ctx.client.send_msgs([args]), name="client forward") + async_start(manager.client_ctx.send_msgs([args]), name="client forward") + elif cmd == "UpdateKeyboard": + updates = manager.update_keyboard(args["keyboard"]) + if len(updates) > 0: + async_start(manager.client_ctx.update_keyboard(updates), name="client update keyboard") elif cmd == "Quit": - ctx.client.exit_event.set() + manager.client_ctx.exit_event.set() async def run_game(): @@ -318,9 +484,7 @@ def client_main(*launch_args: str) -> None: client_ctx = Lingo2ClientContext(args.connect, args.password) game_ctx = Lingo2GameContext() - - client_ctx.game_ctx = game_ctx - game_ctx.client = client_ctx + manager = Lingo2Manager(game_ctx, client_ctx) client_ctx.server_task = asyncio.create_task(server_loop(client_ctx), name="ServerLoop") @@ -328,7 +492,7 @@ def client_main(*launch_args: str) -> None: client_ctx.run_gui() client_ctx.run_cli() - pipe_task = asyncio.create_task(pipe_loop(game_ctx), name="GameWatcher") + pipe_task = asyncio.create_task(pipe_loop(manager), name="GameWatcher") try: await pipe_task -- cgit 1.4.1