Skills Async and Concurrent Processing |
\\n\\nWhen a Skill needs to wait for multiple time-consuming operations (such as network requests, file I/O) simultaneously, asynchronous programming can significantly reduce total waiting time.
\\n\\nThis article introduces the application scenarios and usage of Python asyncio in Skill scripts.
\\n\\n
Synchronous vs Asynchronous: When to Use Async
\\n\\n| Scenario | \\nRecommended Approach | \\nReason | \\n
|---|---|---|
| Single file processing, single API call | \\nSynchronous | \\nAsync benefits are minimal, code is simpler | \\n
| Issuing multiple HTTP requests simultaneously | \\nAsynchronous | \\nConcurrent waiting, total time β longest single request time | \\n
| Reading multiple files simultaneously | \\nAsync or thread pool | \\nTask switching during I/O waiting | \\n
| CPU-intensive computation (e.g., image processing) | \\nMultiprocessing | \\nasyncio does not bypass the GIL | \\n
\\n\\n\\nAsynchronous does not mean parallel. Python's asyncio is a single-threaded event loop, suitable for I/O-intensive tasks (waiting for network, disk). CPU-intensive tasks should use
\\nProcessPoolExecutor.
\\n\\n
Asynchronously Sending Multiple HTTP Requests
\\n\\nSuppose a Skill needs to call 3 API endpoints simultaneously. The synchronous approach would take the sum of all three requests, while the asynchronous approach takes approximately the time of the slowest single request.
\\n\\nExample
\\n\\n# File path: scripts/async_requests.py\\n\\n# Asynchronously call multiple APIs concurrently and aggregate results\\n\\nimport asyncio\\n\\nimport aiohttp # pip install aiohttp --break-system-packages\\n\\nimport json\\n\\nimport sys\\n\\nimport time\\n\\nAPI_KEY = "your_api_key"\\n\\nBASE_URL = "https://api.example.com/v1"\\n\\nasync def fetch_one(session: aiohttp.ClientSession,\\n endpoint: str, payload: dict) -> dict:\\n """Asynchronously make a single POST request"""\\n\\n url = BASE_URL + endpoint\\n\\n headers = {"Authorization": f"Bearer {API_KEY}"}\\n\\n try:\\n\\n async with session.post(url, json=payload,\\n headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:\\n\\n resp.raise_for_status()\\n\\n return {"endpoint": endpoint, "status": "ok", "data": await resp.json()}\\n\\n except Exception as e:\\n\\n return {"endpoint": endpoint, "status": "error", "message": str(e)}\\n\\nasync def fetch_all(requests: list) -> list:\\n """\\n Concurrently send all requests and wait for all to complete\\n\\n Parameters:\\n requests: list, each item is a (endpoint, payload) tuple\\n """\\n\\n async with aiohttp.ClientSession() as session:\\n\\n tasks = [\\n\\n fetch_one(session, endpoint, payload)\\n\\n for endpoint, payload in requests\\n\\n ]\\n\\n # gather executes all tasks concurrently, return_exceptions=True prevents one failure from affecting others\\n\\n return await asyncio.gather(*tasks, return_exceptions=True)\\n\\ndef main():\\n\\n # Define the list of requests to be called concurrently\\n\\n requests = [\\n\\n ("/summarize", {"text": "tutorial is a technical learning platform", "lang": "zh"}),\\n\\n ("/keywords", {"text": "TUTORIAL Provides various programming tutorials", "top_n": 5}),\\n\\n ("/sentiment", {"text": "The tutorials on this platform are of high quality"}),\\n\\n ]\\n\\n t0 = time.perf_counter()\\n\\n results = asyncio.run(fetch_all(requests))\\n\\n elapsed = time.perf_counter() - t0\\n\\n print(f"Completed concurrently {len(results)} requests, total elapsed time {elapsed:.2f} Second")\\n\\n print(json.dumps(results, ensure_ascii=False, indent=2))\\n\\nif __name__ == "__main__":\\n\\n main()\\n\\n\\nConcurrently completed 3 requests, total time 1.23 seconds (synchronous sequential would take approximately 3.5 seconds, concurrent execution saves approximately 65% time)
\\n\\n\\n\\n
Asynchronously Reading Multiple Files
\\n\\nFor scenarios requiring simultaneous reading of multiple files, aiofiles provides an file I/O asynchronous version.
Example
\\n\\n# File path: scripts/async_file_reader.py\\n\\nimport asyncio\\n\\nimport aiofiles # pip install aiofiles --break-system-packages\\n\\nimport os\\n\\nasync def read_file(file_path: str) -> dict:\\n """Asynchronously read a single file"""\\n\\n try:\\n\\n async with aiofiles.open(file_path, encoding="utf-8") as f:\\n\\n content = await f.read()\\n\\n return {\\n\\n "file": os.path.basename(file_path),\\n\\n "chars": len(content),\\n\\n "lines": content.count("n"),\\n\\n "status": "ok"\\n\\n }\\n\\n except Exception as e:\\n\\n return {"file": file_path, "status": "error", "message": str(e)}\\n\\nasync def read_all_files(file_paths: list) -> list:\\n """Concurrently read all files"""\\n\\n tasks = [read_file(fp) for fp in file_paths]\\n\\n return await asyncio.gather(*tasks)\\n\\nif __name__ == "__main__":\\n\\n import glob, json\\n\\n # Read all .txt files in the upload directory\\n\\n files = glob.glob("/mnt/user-data/uploads/*.txt")\\n\\n if not files:\\n\\n print("Not found .txt Files")\\n\\n else:\\n\\n results = asyncio.run(read_all_files(files))\\n\\n print(json.dumps(results, ensure_ascii=False, indent=2))\\n\\n\\n\\n\\n
Timeout Control and Task Cancellation
\\n\\nIn asynchronous tasks, timeout control is particularly important. asyncio.wait_for can set a maximum waiting time for a single task.
Example
\\n\\n# File path: scripts/async_timeout.py\\n\\nimport asyncio\\n\\nasync def slow_task(name: str, delay: float) -> str:\\n """Simulate a time-consuming task"""\\n\\n await asyncio.sleep(delay)\\n\\n return f"{name} Completed"\\n\\nasync def run_with_timeout():\\n\\n tasks = {\\n\\n "Fast task": slow_task("Fast task", 0.5),\\n\\n "Slow task": slow_task("Slow task", 5.0),\\n\\n }\\n\\n results = {}\\n\\n for name, coro in tasks.items():\\n\\n try:\\n\\n # Each task waits at most 2 seconds\\n\\n result = await asyncio.wait_for(coro, timeout=2.0)\\n\\n results = {"status": "ok", "result": result}\\n\\n except asyncio.TimeoutError:\\n\\n results = {"status": "timeout", "message": "Not completed after 2 seconds"}\\n\\n return results\\n\\nif __name__ == "__main__":\\n\\n import json\\n\\n output = asyncio.run(run_with_timeout())\\n\\n print(json.dumps(output, ensure_ascii=False, indent=2))\\n\\n\\n{ "Fast task": {"status": "ok", "result": "Fast task Completed"}, "Slow task": {"status": "timeout", "message": "Not completed after 2 seconds"}}\\n\\n\\n\\n\\n
Documenting Async Script Usage in SKILL.md
\\n\\nConcurrent API Calls
\\n\\nWhen users provide multiple texts that need simultaneous analysis, run the asynchronous script to improve speed:
\\n\\npython scripts/async_requests.py\\n\\n\\nThis script will concurrently issue all requests, with total time approximately equal to the slowest single request, rather than the sum of all request times.
YouTip