import asyncio import os import pkgutil import subprocess from typing import Any import websockets import Utils import settings from CommonClient import CommonContext, server_loop, gui_enabled, logger, get_base_parser, handle_url_arg from NetUtils import Endpoint, decode, encode from Utils import async_start PORT = 43182 MESSAGE_MAX_SIZE = 16*1024*1024 class Lingo2GameContext: server: Endpoint | None client: "Lingo2ClientContext" def __init__(self): self.server = None def send_connected(self): msg = { "cmd": "Connected", "user": self.client.username, "seed_name": self.client.seed_name, "version": self.client.server_version, "generator_version": self.client.generator_version, "team": self.client.team, "slot": self.client.slot, "checked_locations": self.client.checked_locations, "slot_data": self.client.slot_data, } async_start(self.send_msgs([msg]), name="game Connected") def send_item_sent_notification(self, item_name, receiver_name, item_flags): msg = { "cmd": "ItemSentNotif", "item_name": item_name, "receiver_name": receiver_name, "item_flags": item_flags, } async_start(self.send_msgs([msg]), name="item sent notif") def send_hint_received(self, item_name, location_name, receiver_name, item_flags, for_self): msg = { "cmd": "HintReceived", "item_name": item_name, "location_name": location_name, "receiver_name": receiver_name, "item_flags": item_flags, "self": int(for_self), } async_start(self.send_msgs([msg]), name="hint received notif") def send_item_received(self, items): msg = { "cmd": "ItemReceived", "items": items, } async_start(self.send_msgs([msg]), name="item received") def send_location_info(self, locations): msg = { "cmd": "LocationInfo", "locations": locations, } async_start(self.send_msgs([msg]), name="location info") def send_text_message(self, parts): msg = { "cmd": "TextMessage", "data": parts, } async_start(self.send_msgs([msg]), name="notif") async def send_msgs(self, msgs: list[Any]) -> None: """ `msgs` JSON serializable """ if not self.server or not self.server.socket.open or self.server.socket.closed: return await self.server.socket.send(encode(msgs)) class Lingo2ClientContext(CommonContext): game_ctx: Lingo2GameContext game = "Lingo 2" items_handling = 0b111 slot_data: dict[str, Any] | None def __init__(self, server_address: str | None = None, password: str | None = None): super().__init__(server_address, password) def make_gui(self): ui = super().make_gui() ui.base_title = "Archipelago Lingo 2 Client" return ui async def server_auth(self, password_requested: bool = False): self.auth = self.username await self.send_connect() def on_package(self, cmd: str, args: dict): if cmd == "RoomInfo": self.seed_name = args.get("seed_name", None) elif cmd == "Connected": self.slot_data = args.get("slot_data", None) if self.game_ctx.server is not None: self.game_ctx.send_connected() elif cmd == "ReceivedItems": if self.game_ctx.server is not None: cur_index = 0 items = [] for item in args["items"]: index = cur_index + args["index"] cur_index += 1 item_msg = {
find_package(Protobuf REQUIRED)
add_executable(assign_ids
main.cpp
)
set_property(TARGET assign_ids PROPERTY CXX_STANDARD 20)
set_property(TARGET assign_ids PROPERTY CXX_STANDARD_REQUIRED ON)
target_include_directories(assign_ids PUBLIC ${CMAKE_BINARY_DIR} ${CMAKE_SOURCE_DIR}/tools)
target_link_libraries(assign_ids PUBLIC protos protobuf::libprotobuf util)