about summary refs log tree commit diff stats
path: root/apworld/context.py
diff options
context:
space:
mode:
Diffstat (limited to 'apworld/context.py')
-rw-r--r--apworld/context.py450
1 files changed, 338 insertions, 112 deletions
diff --git a/apworld/context.py b/apworld/context.py index 0a058e5..4a85868 100644 --- a/apworld/context.py +++ b/apworld/context.py
@@ -15,35 +15,99 @@ from Utils import async_start
15from . import Lingo2World 15from . import Lingo2World
16from .tracker import Tracker 16from .tracker import Tracker
17 17
18PORT = 43182 18ALL_LETTERS = "abcdefghijklmnopqrstuvwxyz"
19MESSAGE_MAX_SIZE = 16*1024*1024 19MESSAGE_MAX_SIZE = 16*1024*1024
20PORT = 43182
21
22KEY_STORAGE_MAPPING = {
23 "a": (1, 0), "b": (1, 1), "c": (1, 2), "d": (1, 3), "e": (1, 4), "f": (1, 5), "g": (1, 6), "h": (1, 7), "i": (1, 8),
24 "j": (1, 9), "k": (1, 10), "l": (1, 11), "m": (1, 12), "n": (2, 0), "o": (2, 1), "p": (2, 2), "q": (2, 3),
25 "r": (2, 4), "s": (2, 5), "t": (2, 6), "u": (2, 7), "v": (2, 8), "w": (2, 9), "x": (2, 10), "y": (2, 11),
26 "z": (2, 12),
27}
28
29REVERSE_KEY_STORAGE_MAPPING = {t: k for k, t in KEY_STORAGE_MAPPING.items()}
30
31
32class Lingo2Manager:
33 game_ctx: "Lingo2GameContext"
34 client_ctx: "Lingo2ClientContext"
35 tracker: Tracker
36
37 keyboard: dict[str, int]
38 worldports: set[int]
39
40 def __init__(self, game_ctx: "Lingo2GameContext", client_ctx: "Lingo2ClientContext"):
41 self.game_ctx = game_ctx
42 self.game_ctx.manager = self
43 self.client_ctx = client_ctx
44 self.client_ctx.manager = self
45 self.tracker = Tracker(self)
46 self.keyboard = {}
47 self.worldports = set()
48
49 self.reset()
50
51 def reset(self):
52 for k in ALL_LETTERS:
53 self.keyboard[k] = 0
54
55 self.worldports = set()
56
57 def update_keyboard(self, new_keyboard: dict[str, int]) -> dict[str, int]:
58 ret: dict[str, int] = {}
59
60 for k, v in new_keyboard.items():
61 if v > self.keyboard.get(k, 0):
62 self.keyboard[k] = v
63 ret[k] = v
64
65 if len(ret) > 0:
66 self.tracker.refresh_state()
67 self.game_ctx.send_accessible_locations()
68
69 return ret
70
71 def update_worldports(self, new_worldports: set[int]) -> set[int]:
72 ret = new_worldports.difference(self.worldports)
73 self.worldports.update(new_worldports)
74
75 if len(ret) > 0:
76 self.tracker.refresh_state()
77 self.game_ctx.send_accessible_locations()
78
79 return ret
20 80
21 81
22class Lingo2GameContext: 82class Lingo2GameContext:
23 server: Endpoint | None 83 server: Endpoint | None
24 client: "Lingo2ClientContext" 84 manager: Lingo2Manager
25 tracker: Tracker
26 85
27 def __init__(self): 86 def __init__(self):
28 self.server = None 87 self.server = None
29 self.tracker = Tracker()
30 88
31 def send_connected(self): 89 def send_connected(self):
90 if self.server is None:
91 return
92
32 msg = { 93 msg = {
33 "cmd": "Connected", 94 "cmd": "Connected",
34 "user": self.client.username, 95 "user": self.manager.client_ctx.username,
35 "seed_name": self.client.seed_name, 96 "seed_name": self.manager.client_ctx.seed_name,
36 "version": self.client.server_version, 97 "version": self.manager.client_ctx.server_version,
37 "generator_version": self.client.generator_version, 98 "generator_version": self.manager.client_ctx.generator_version,
38 "team": self.client.team, 99 "team": self.manager.client_ctx.team,
39 "slot": self.client.slot, 100 "slot": self.manager.client_ctx.slot,
40 "checked_locations": self.client.checked_locations, 101 "checked_locations": self.manager.client_ctx.checked_locations,
41 "slot_data": self.client.slot_data, 102 "slot_data": self.manager.client_ctx.slot_data,
42 } 103 }
43 104
44 async_start(self.send_msgs([msg]), name="game Connected") 105 async_start(self.send_msgs([msg]), name="game Connected")
45 106
46 def send_item_sent_notification(self, item_name, receiver_name, item_flags): 107 def send_item_sent_notification(self, item_name, receiver_name, item_flags):
108 if self.server is None:
109 return
110
47 msg = { 111 msg = {
48 "cmd": "ItemSentNotif", 112 "cmd": "ItemSentNotif",
49 "item_name": item_name, 113 "item_name": item_name,
@@ -54,6 +118,9 @@ class Lingo2GameContext:
54 async_start(self.send_msgs([msg]), name="item sent notif") 118 async_start(self.send_msgs([msg]), name="item sent notif")
55 119
56 def send_hint_received(self, item_name, location_name, receiver_name, item_flags, for_self): 120 def send_hint_received(self, item_name, location_name, receiver_name, item_flags, for_self):
121 if self.server is None:
122 return
123
57 msg = { 124 msg = {
58 "cmd": "HintReceived", 125 "cmd": "HintReceived",
59 "item_name": item_name, 126 "item_name": item_name,
@@ -66,6 +133,9 @@ class Lingo2GameContext:
66 async_start(self.send_msgs([msg]), name="hint received notif") 133 async_start(self.send_msgs([msg]), name="hint received notif")
67 134
68 def send_item_received(self, items): 135 def send_item_received(self, items):
136 if self.server is None:
137 return
138
69 msg = { 139 msg = {
70 "cmd": "ItemReceived", 140 "cmd": "ItemReceived",
71 "items": items, 141 "items": items,
@@ -74,6 +144,9 @@ class Lingo2GameContext:
74 async_start(self.send_msgs([msg]), name="item received") 144 async_start(self.send_msgs([msg]), name="item received")
75 145
76 def send_location_info(self, locations): 146 def send_location_info(self, locations):
147 if self.server is None:
148 return
149
77 msg = { 150 msg = {
78 "cmd": "LocationInfo", 151 "cmd": "LocationInfo",
79 "locations": locations, 152 "locations": locations,
@@ -82,6 +155,9 @@ class Lingo2GameContext:
82 async_start(self.send_msgs([msg]), name="location info") 155 async_start(self.send_msgs([msg]), name="location info")
83 156
84 def send_text_message(self, parts): 157 def send_text_message(self, parts):
158 if self.server is None:
159 return
160
85 msg = { 161 msg = {
86 "cmd": "TextMessage", 162 "cmd": "TextMessage",
87 "data": parts, 163 "data": parts,
@@ -90,14 +166,23 @@ class Lingo2GameContext:
90 async_start(self.send_msgs([msg]), name="notif") 166 async_start(self.send_msgs([msg]), name="notif")
91 167
92 def send_accessible_locations(self): 168 def send_accessible_locations(self):
169 if self.server is None:
170 return
171
93 msg = { 172 msg = {
94 "cmd": "AccessibleLocations", 173 "cmd": "AccessibleLocations",
95 "locations": list(self.tracker.accessible_locations), 174 "locations": list(self.manager.tracker.accessible_locations),
96 } 175 }
97 176
177 if len(self.manager.tracker.accessible_worldports) > 0:
178 msg["worldports"] = list(self.manager.tracker.accessible_worldports)
179
98 async_start(self.send_msgs([msg]), name="accessible locations") 180 async_start(self.send_msgs([msg]), name="accessible locations")
99 181
100 def send_update_locations(self, locations): 182 def send_update_locations(self, locations):
183 if self.server is None:
184 return
185
101 msg = { 186 msg = {
102 "cmd": "UpdateLocations", 187 "cmd": "UpdateLocations",
103 "locations": locations, 188 "locations": locations,
@@ -105,6 +190,28 @@ class Lingo2GameContext:
105 190
106 async_start(self.send_msgs([msg]), name="update locations") 191 async_start(self.send_msgs([msg]), name="update locations")
107 192
193 def send_update_keyboard(self, updates):
194 if self.server is None:
195 return
196
197 msg = {
198 "cmd": "UpdateKeyboard",
199 "updates": updates,
200 }
201
202 async_start(self.send_msgs([msg]), name="update keyboard")
203
204 def send_update_worldports(self, worldports):
205 if self.server is None:
206 return
207
208 msg = {
209 "cmd": "UpdateWorldports",
210 "worldports": worldports,
211 }
212
213 async_start(self.send_msgs([msg]), name="update worldports")
214
108 async def send_msgs(self, msgs: list[Any]) -> None: 215 async def send_msgs(self, msgs: list[Any]) -> None:
109 """ `msgs` JSON serializable """ 216 """ `msgs` JSON serializable """
110 if not self.server or not self.server.socket.open or self.server.socket.closed: 217 if not self.server or not self.server.socket.open or self.server.socket.closed:
@@ -113,7 +220,7 @@ class Lingo2GameContext:
113 220
114 221
115class Lingo2ClientContext(CommonContext): 222class Lingo2ClientContext(CommonContext):
116 game_ctx: Lingo2GameContext 223 manager: Lingo2Manager
117 224
118 game = "Lingo 2" 225 game = "Lingo 2"
119 items_handling = 0b111 226 items_handling = 0b111
@@ -138,118 +245,226 @@ class Lingo2ClientContext(CommonContext):
138 elif cmd == "Connected": 245 elif cmd == "Connected":
139 self.slot_data = args.get("slot_data", None) 246 self.slot_data = args.get("slot_data", None)
140 247
141 if self.game_ctx.server is not None: 248 self.manager.reset()
142 self.game_ctx.send_connected() 249
143 250 self.manager.game_ctx.send_connected()
144 self.game_ctx.tracker.setup_slot(self.slot_data) 251
252 self.manager.tracker.setup_slot(self.slot_data)
253 self.manager.tracker.set_checked_locations(self.checked_locations)
254 self.manager.game_ctx.send_accessible_locations()
255
256 self.set_notify(self.get_datastorage_key("keyboard1"), self.get_datastorage_key("keyboard2"))
257 msg_batch = [{
258 "cmd": "Set",
259 "key": self.get_datastorage_key("keyboard1"),
260 "default": 0,
261 "want_reply": True,
262 "operations": [{"operation": "default", "value": 0}]
263 }, {
264 "cmd": "Set",
265 "key": self.get_datastorage_key("keyboard2"),
266 "default": 0,
267 "want_reply": True,
268 "operations": [{"operation": "default", "value": 0}]
269 }]
270
271 if self.slot_data["shuffle_worldports"]:
272 self.set_notify(self.get_datastorage_key("worldports"))
273 msg_batch.append({
274 "cmd": "Set",
275 "key": self.get_datastorage_key("worldports"),
276 "default": [],
277 "want_reply": True,
278 "operations": [{"operation": "default", "value": []}]
279 })
280
281 async_start(self.send_msgs(msg_batch), name="default keys")
145 elif cmd == "RoomUpdate": 282 elif cmd == "RoomUpdate":
146 if self.game_ctx.server is not None: 283 self.manager.tracker.set_checked_locations(self.checked_locations)
147 self.game_ctx.send_update_locations(args["checked_locations"]) 284 self.manager.game_ctx.send_update_locations(args["checked_locations"])
148 elif cmd == "ReceivedItems": 285 elif cmd == "ReceivedItems":
149 self.game_ctx.tracker.set_collected_items(self.items_received) 286 self.manager.tracker.set_collected_items(self.items_received)
150 287
151 if self.game_ctx.server is not None: 288 cur_index = 0
152 cur_index = 0 289 items = []
153 items = []
154 290
155 for item in args["items"]: 291 for item in args["items"]:
156 index = cur_index + args["index"] 292 index = cur_index + args["index"]
157 cur_index += 1 293 cur_index += 1
158 294
159 item_msg = { 295 item_msg = {
160 "id": item.item, 296 "id": item.item,
161 "index": index, 297 "index": index,
162 "flags": item.flags, 298 "flags": item.flags,
163 "text": self.item_names.lookup_in_slot(item.item, self.slot), 299 "text": self.item_names.lookup_in_slot(item.item, self.slot),
164 } 300 }
165 301
166 if item.player != self.slot: 302 if item.player != self.slot:
167 item_msg["sender"] = self.player_names.get(item.player) 303 item_msg["sender"] = self.player_names.get(item.player)
168 304
169 items.append(item_msg) 305 items.append(item_msg)
170 306
171 self.game_ctx.send_item_received(items) 307 self.manager.game_ctx.send_item_received(items)
172 308
173 if any(ItemClassification.progression in ItemClassification(item.flags) for item in args["items"]): 309 if any(ItemClassification.progression in ItemClassification(item.flags) for item in args["items"]):
174 self.game_ctx.send_accessible_locations() 310 self.manager.game_ctx.send_accessible_locations()
175 elif cmd == "PrintJSON": 311 elif cmd == "PrintJSON":
176 if self.game_ctx.server is not None: 312 if "receiving" in args and "item" in args and args["item"].player == self.slot:
177 if "receiving" in args and "item" in args and args["item"].player == self.slot: 313 item_name = self.item_names.lookup_in_slot(args["item"].item, args["receiving"])
178 item_name = self.item_names.lookup_in_slot(args["item"].item, args["receiving"]) 314 location_name = self.location_names.lookup_in_slot(args["item"].location, args["item"].player)
179 location_name = self.location_names.lookup_in_slot(args["item"].location, args["item"].player) 315 receiver_name = self.player_names.get(args["receiving"])
180 receiver_name = self.player_names.get(args["receiving"]) 316
181 317 if args["type"] == "Hint" and not args.get("found", False):
182 if args["type"] == "Hint" and not args.get("found", False): 318 self.manager.game_ctx.send_hint_received(item_name, location_name, receiver_name, args["item"].flags,
183 self.game_ctx.send_hint_received(item_name, location_name, receiver_name, args["item"].flags, 319 int(args["receiving"]) == self.slot)
184 int(args["receiving"]) == self.slot) 320 elif args["receiving"] != self.slot:
185 elif args["receiving"] != self.slot: 321 self.manager.game_ctx.send_item_sent_notification(item_name, receiver_name, args["item"].flags)
186 self.game_ctx.send_item_sent_notification(item_name, receiver_name, args["item"].flags) 322
187 323 parts = []
188 parts = [] 324 for message_part in args["data"]:
189 for message_part in args["data"]: 325 if "type" not in message_part and "text" in message_part:
190 if "type" not in message_part and "text" in message_part: 326 parts.append({"type": "text", "text": message_part["text"]})
191 parts.append({"type": "text", "text": message_part["text"]}) 327 elif message_part["type"] == "player_id":
192 elif message_part["type"] == "player_id": 328 parts.append({
193 parts.append({ 329 "type": "player",
194 "type": "player", 330 "text": self.player_names.get(int(message_part["text"])),
195 "text": self.player_names.get(int(message_part["text"])), 331 "self": int(int(message_part["text"]) == self.slot),
196 "self": int(int(message_part["text"]) == self.slot),
197 })
198 elif message_part["type"] == "item_id":
199 parts.append({
200 "type": "item",
201 "text": self.item_names.lookup_in_slot(int(message_part["text"]), message_part["player"]),
202 "flags": message_part["flags"],
203 })
204 elif message_part["type"] == "location_id":
205 parts.append({
206 "type": "location",
207 "text": self.location_names.lookup_in_slot(int(message_part["text"]),
208 message_part["player"])
209 })
210 elif "text" in message_part:
211 parts.append({"type": "text", "text": message_part["text"]})
212
213 self.game_ctx.send_text_message(parts)
214 elif cmd == "LocationInfo":
215 if self.game_ctx.server is not None:
216 locations = []
217
218 for location in args["locations"]:
219 locations.append({
220 "id": location.location,
221 "item": self.item_names.lookup_in_slot(location.item, location.player),
222 "player": self.player_names.get(location.player),
223 "flags": location.flags,
224 "self": int(location.player) == self.slot,
225 }) 332 })
333 elif message_part["type"] == "item_id":
334 parts.append({
335 "type": "item",
336 "text": self.item_names.lookup_in_slot(int(message_part["text"]), message_part["player"]),
337 "flags": message_part["flags"],
338 })
339 elif message_part["type"] == "location_id":
340 parts.append({
341 "type": "location",
342 "text": self.location_names.lookup_in_slot(int(message_part["text"]),
343 message_part["player"])
344 })
345 elif "text" in message_part:
346 parts.append({"type": "text", "text": message_part["text"]})
226 347
227 self.game_ctx.send_location_info(locations) 348 self.manager.game_ctx.send_text_message(parts)
228 349 elif cmd == "LocationInfo":
229 if cmd in ["Connected", "RoomUpdate"]: 350 locations = []
230 self.game_ctx.tracker.set_checked_locations(self.checked_locations) 351
231 352 for location in args["locations"]:
232 353 locations.append({
233async def pipe_loop(ctx: Lingo2GameContext): 354 "id": location.location,
234 while not ctx.client.exit_event.is_set(): 355 "item": self.item_names.lookup_in_slot(location.item, location.player),
356 "player": self.player_names.get(location.player),
357 "flags": location.flags,
358 "self": int(location.player) == self.slot,
359 })
360
361 self.manager.game_ctx.send_location_info(locations)
362 elif cmd == "SetReply":
363 if args["key"] == self.get_datastorage_key("keyboard1"):
364 self.handle_keyboard_update(1, args)
365 elif args["key"] == self.get_datastorage_key("keyboard2"):
366 self.handle_keyboard_update(2, args)
367 elif args["key"] == self.get_datastorage_key("worldports"):
368 updates = self.manager.update_worldports(set(args["value"]))
369 if len(updates) > 0:
370 self.manager.game_ctx.send_update_worldports(updates)
371
372 def get_datastorage_key(self, name: str):
373 return f"Lingo2_{self.slot}_{name}"
374
375 async def update_keyboard(self, updates: dict[str, int]):
376 kb1 = 0
377 kb2 = 0
378
379 for k, v in updates.items():
380 if v == 0:
381 continue
382
383 effect = 0
384 if v >= 1:
385 effect |= 1
386 if v == 2:
387 effect |= 2
388
389 pos = KEY_STORAGE_MAPPING[k]
390 if pos[0] == 1:
391 kb1 |= (effect << pos[1] * 2)
392 else:
393 kb2 |= (effect << pos[1] * 2)
394
395 msgs = []
396
397 if kb1 != 0:
398 msgs.append({
399 "cmd": "Set",
400 "key": self.get_datastorage_key("keyboard1"),
401 "want_reply": True,
402 "operations": [{
403 "operation": "or",
404 "value": kb1
405 }]
406 })
407
408 if kb2 != 0:
409 msgs.append({
410 "cmd": "Set",
411 "key": self.get_datastorage_key("keyboard2"),
412 "want_reply": True,
413 "operations": [{
414 "operation": "or",
415 "value": kb2
416 }]
417 })
418
419 if len(msgs) > 0:
420 await self.send_msgs(msgs)
421
422 def handle_keyboard_update(self, field: int, args: dict[str, Any]):
423 keys = {}
424 value = args["value"]
425
426 for i in range(0, 13):
427 if (value & (1 << (i * 2))) != 0:
428 keys[REVERSE_KEY_STORAGE_MAPPING[(field, i)]] = 1
429 if (value & (1 << (i * 2 + 1))) != 0:
430 keys[REVERSE_KEY_STORAGE_MAPPING[(field, i)]] = 2
431
432 updates = self.manager.update_keyboard(keys)
433 if len(updates) > 0:
434 self.manager.game_ctx.send_update_keyboard(updates)
435
436 async def update_worldports(self, updates: set[int]):
437 await self.send_msgs([{
438 "cmd": "Set",
439 "key": self.get_datastorage_key("worldports"),
440 "want_reply": True,
441 "operations": [{
442 "operation": "update",
443 "value": updates
444 }]
445 }])
446
447
448async def pipe_loop(manager: Lingo2Manager):
449 while not manager.client_ctx.exit_event.is_set():
235 try: 450 try:
236 socket = await websockets.connect("ws://localhost", port=PORT, ping_timeout=None, ping_interval=None, 451 socket = await websockets.connect("ws://localhost", port=PORT, ping_timeout=None, ping_interval=None,
237 max_size=MESSAGE_MAX_SIZE) 452 max_size=MESSAGE_MAX_SIZE)
238 ctx.server = Endpoint(socket) 453 manager.game_ctx.server = Endpoint(socket)
239 logger.info("Connected to Lingo 2!") 454 logger.info("Connected to Lingo 2!")
240 if ctx.client.auth is not None: 455 if manager.client_ctx.auth is not None:
241 ctx.send_connected() 456 manager.game_ctx.send_connected()
242 ctx.send_accessible_locations() 457 manager.game_ctx.send_accessible_locations()
243 async for data in ctx.server.socket: 458 async for data in manager.game_ctx.server.socket:
244 for msg in decode(data): 459 for msg in decode(data):
245 await process_game_cmd(ctx, msg) 460 await process_game_cmd(manager, msg)
246 except ConnectionRefusedError: 461 except ConnectionRefusedError:
247 logger.info("Could not connect to Lingo 2.") 462 logger.info("Could not connect to Lingo 2.")
248 finally: 463 finally:
249 ctx.server = None 464 manager.game_ctx.server = None
250 465
251 466
252async def process_game_cmd(ctx: Lingo2GameContext, args: dict): 467async def process_game_cmd(manager: Lingo2Manager, args: dict):
253 cmd = args["cmd"] 468 cmd = args["cmd"]
254 469
255 if cmd == "Connect": 470 if cmd == "Connect":
@@ -262,13 +477,26 @@ async def process_game_cmd(ctx: Lingo2GameContext, args: dict):
262 else: 477 else:
263 server_address = f"{player}:None@{server}" 478 server_address = f"{player}:None@{server}"
264 479
265 async_start(ctx.client.connect(server_address), name="client connect") 480 async_start(manager.client_ctx.connect(server_address), name="client connect")
266 elif cmd == "Disconnect": 481 elif cmd == "Disconnect":
267 async_start(ctx.client.disconnect(), name="client disconnect") 482 async_start(manager.client_ctx.disconnect(), name="client disconnect")
268 elif cmd in ["Sync", "LocationChecks", "Say", "StatusUpdate", "LocationScouts"]: 483 elif cmd in ["Sync", "LocationChecks", "Say", "StatusUpdate", "LocationScouts"]:
269 async_start(ctx.client.send_msgs([args]), name="client forward") 484 async_start(manager.client_ctx.send_msgs([args]), name="client forward")
485 elif cmd == "UpdateKeyboard":
486 updates = manager.update_keyboard(args["keyboard"])
487 if len(updates) > 0:
488 async_start(manager.client_ctx.update_keyboard(updates), name="client update keyboard")
489 elif cmd == "CheckWorldport":
490 port_id = args["port_id"]
491 worldports = {port_id}
492 if str(port_id) in manager.client_ctx.slot_data["port_pairings"]:
493 worldports.add(manager.client_ctx.slot_data["port_pairings"][str(port_id)])
494
495 updates = manager.update_worldports(worldports)
496 if len(updates) > 0:
497 async_start(manager.client_ctx.update_worldports(updates), name="client update worldports")
270 elif cmd == "Quit": 498 elif cmd == "Quit":
271 ctx.client.exit_event.set() 499 manager.client_ctx.exit_event.set()
272 500
273 501
274async def run_game(): 502async def run_game():
@@ -318,9 +546,7 @@ def client_main(*launch_args: str) -> None:
318 546
319 client_ctx = Lingo2ClientContext(args.connect, args.password) 547 client_ctx = Lingo2ClientContext(args.connect, args.password)
320 game_ctx = Lingo2GameContext() 548 game_ctx = Lingo2GameContext()
321 549 manager = Lingo2Manager(game_ctx, client_ctx)
322 client_ctx.game_ctx = game_ctx
323 game_ctx.client = client_ctx
324 550
325 client_ctx.server_task = asyncio.create_task(server_loop(client_ctx), name="ServerLoop") 551 client_ctx.server_task = asyncio.create_task(server_loop(client_ctx), name="ServerLoop")
326 552
@@ -328,7 +554,7 @@ def client_main(*launch_args: str) -> None:
328 client_ctx.run_gui() 554 client_ctx.run_gui()
329 client_ctx.run_cli() 555 client_ctx.run_cli()
330 556
331 pipe_task = asyncio.create_task(pipe_loop(game_ctx), name="GameWatcher") 557 pipe_task = asyncio.create_task(pipe_loop(manager), name="GameWatcher")
332 558
333 try: 559 try:
334 await pipe_task 560 await pipe_task