import asyncio import os import pkgutil import subprocess from typing import Any import websockets import Utils import settings from CommonClient import CommonContext, server_loop, gui_enabled, logger, get_base_parser, handle_url_arg from NetUtils import Endpoint, decode, encode from Utils import async_start PORT = 43182 MESSAGE_MAX_SIZE = 16*1024*1024 class Lingo2GameContext: server: Endpoint | None client: "Lingo2ClientContext" def __init__(self): self.server = None def send_connected(self): msg = { "cmd": "Connected", "user": self.client.username, "seed_name": self.client.seed_name, "version": self.client.server_version, "generator_version": self.client.generator_version, "team": self.client.team, "slot": self.client.slot, "checked_locations": self.client.checked_locations, "slot_data": self.client.slot_data, } async_start(self.send_msgs([msg]), name="game Connected") def send_item_sent_notification(self, item_name, receiver_name, item_flags): msg = { "cmd": "ItemSentNotif", "item_name": item_name, "receiver_name": receiver_name, "item_flags": item_flags, } async_start(self.send_msgs([msg]), name="item sent notif") def send_hint_received(self, item_name, location_name, receiver_name, item_flags, for_self): msg = { "cmd": "HintReceived", "item_name": item_name, "location_name": location_name, "receiver_name": receiver_name, "item_flags": item_flags, "self": int(for_self), } async_start(self.send_msgs([msg]), name="hint received notif") def send_item_received(self, items): msg = { "cmd": "ItemReceived", "items": items, } async_start(self.send_msgs([msg]), name="item received") def send_location_info(self, locations): msg = { "cmd": "LocationInfo", "locations": locations, } async_start(self.send_msgs([msg]), name="location info") def send_text_message(self, parts): msg = { "cmd": "TextMessage", "data": parts, } async_start(self.send_msgs([msg]), name="notif") async def send_msgs(self, msgs: list[Any]) -> None: """ `msgs` JSON serializable """ if not self.server or not self.server.socket.open or self.server.socket.closed: return await self.server.socket.send(encode(msgs)) class Lingo2ClientContext(CommonContext): game_ctx: Lingo2GameContext game = "Lingo 2" items_handling = 0b111 slot_data: dict[str, Any] | None def __init__(self, server_address: str | None = None, password: str | None = None): super().__init__(server_address, password) def make_gui(self): ui = super().make_gui() ui.base_title = "Archipelago Lingo 2 Client" return ui async def server_auth(self, password_requested: bool = False): self.auth = self.username await self.send_connect() def on_package(self, cmd: str, args: dict): if cmd == "RoomInfo": self.seed_name = args.get("seed_name", None) elif cmd == "Connected": self.slot_data = args.get("slot_data", None) if self.game_ctx.server is not None: self.game_ctx.send_connected() elif cmd == "ReceivedItems": if self.game_ctx.server is not None: cur_index = 0 items = [] for item in args["items"]: index = cur_index + args["index"] cur_index += 1 item_msg = {
find_package(Protobuf REQUIRED)

add_executable(assign_ids
  main.cpp
)
set_property(TARGET assign_ids PROPERTY CXX_STANDARD 20)
set_property(TARGET assign_ids PROPERTY CXX_STANDARD_REQUIRED ON)
target_include_directories(assign_ids PUBLIC ${CMAKE_BINARY_DIR} ${CMAKE_SOURCE_DIR}/tools)
target_link_libraries(assign_ids PUBLIC protos protobuf::libprotobuf util)
al_path("worlds", "lingo2", "client"), ], cwd=os.path.dirname(exe_file), ) def client_main(*launch_args: str) -> None: async def main(args): async_start(run_game()) client_ctx = Lingo2ClientContext(args.connect, args.password) game_ctx = Lingo2GameContext() client_ctx.game_ctx = game_ctx game_ctx.client = client_ctx client_ctx.server_task = asyncio.create_task(server_loop(client_ctx), name="ServerLoop") if gui_enabled: client_ctx.run_gui() client_ctx.run_cli() pipe_task = asyncio.create_task(pipe_loop(game_ctx), name="GameWatcher") try: await pipe_task except Exception as e: logger.exception(e) await client_ctx.exit_event.wait() await client_ctx.shutdown() Utils.init_logging("Lingo2Client", exception_logger="Client") import colorama parser = get_base_parser(description="Lingo 2 Archipelago Client") parser.add_argument('--name', default=None, help="Slot Name to connect as.") parser.add_argument("url", nargs="?", help="Archipelago connection url") args = parser.parse_args(launch_args) args = handle_url_arg(args, parser=parser) colorama.just_fix_windows_console() asyncio.run(main(args)) colorama.deinit()