YouTip LogoYouTip

Skills Async

Skills Async and Concurrent Processing |

\\n\\n

When a Skill needs to wait for multiple time-consuming operations (such as network requests, file I/O) simultaneously, asynchronous programming can significantly reduce total waiting time.

\\n\\n

This article introduces the application scenarios and usage of Python asyncio in Skill scripts.

\\n\\n
\\n\\n

Synchronous vs Asynchronous: When to Use Async

\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n
ScenarioRecommended ApproachReason
Single file processing, single API callSynchronousAsync benefits are minimal, code is simpler
Issuing multiple HTTP requests simultaneouslyAsynchronousConcurrent waiting, total time β‰ˆ longest single request time
Reading multiple files simultaneouslyAsync or thread poolTask switching during I/O waiting
CPU-intensive computation (e.g., image processing)Multiprocessingasyncio does not bypass the GIL
\\n\\n
\\n

Asynchronous does not mean parallel. Python's asyncio is a single-threaded event loop, suitable for I/O-intensive tasks (waiting for network, disk). CPU-intensive tasks should use ProcessPoolExecutor.

\\n
\\n\\n
\\n\\n

Asynchronously Sending Multiple HTTP Requests

\\n\\n

Suppose a Skill needs to call 3 API endpoints simultaneously. The synchronous approach would take the sum of all three requests, while the asynchronous approach takes approximately the time of the slowest single request.

\\n\\n

Example

\\n\\n
# File path: scripts/async_requests.py\\n\\n# Asynchronously call multiple APIs concurrently and aggregate results\\n\\nimport asyncio\\n\\nimport aiohttp  # pip install aiohttp --break-system-packages\\n\\nimport json\\n\\nimport sys\\n\\nimport time\\n\\nAPI_KEY = "your_api_key"\\n\\nBASE_URL = "https://api.example.com/v1"\\n\\nasync def fetch_one(session: aiohttp.ClientSession,\\n                      endpoint: str, payload: dict) -> dict:\\n    """Asynchronously make a single POST request"""\\n\\n    url = BASE_URL + endpoint\\n\\n    headers = {"Authorization": f"Bearer {API_KEY}"}\\n\\n    try:\\n\\n        async with session.post(url, json=payload,\\n                                headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:\\n\\n            resp.raise_for_status()\\n\\n            return {"endpoint": endpoint, "status": "ok", "data": await resp.json()}\\n\\n    except Exception as e:\\n\\n        return {"endpoint": endpoint, "status": "error", "message": str(e)}\\n\\nasync def fetch_all(requests: list) -> list:\\n    """\\n    Concurrently send all requests and wait for all to complete\\n\\n    Parameters:\\n        requests: list, each item is a (endpoint, payload) tuple\\n    """\\n\\n    async with aiohttp.ClientSession() as session:\\n\\n        tasks = [\\n\\n            fetch_one(session, endpoint, payload)\\n\\n            for endpoint, payload in requests\\n\\n        ]\\n\\n        # gather executes all tasks concurrently, return_exceptions=True prevents one failure from affecting others\\n\\n        return await asyncio.gather(*tasks, return_exceptions=True)\\n\\ndef main():\\n\\n    # Define the list of requests to be called concurrently\\n\\n    requests = [\\n\\n        ("/summarize", {"text": "tutorial is a technical learning platform", "lang": "zh"}),\\n\\n        ("/keywords", {"text": "TUTORIAL Provides various programming tutorials", "top_n": 5}),\\n\\n        ("/sentiment", {"text": "The tutorials on this platform are of high quality"}),\\n\\n    ]\\n\\n    t0 = time.perf_counter()\\n\\n    results = asyncio.run(fetch_all(requests))\\n\\n    elapsed = time.perf_counter() - t0\\n\\n    print(f"Completed concurrently {len(results)} requests, total elapsed time {elapsed:.2f} Second")\\n\\n    print(json.dumps(results, ensure_ascii=False, indent=2))\\n\\nif __name__ == "__main__":\\n\\n    main()\\n
\\n\\n

Concurrently completed 3 requests, total time 1.23 seconds (synchronous sequential would take approximately 3.5 seconds, concurrent execution saves approximately 65% time)

\\n\\n
\\n\\n

Asynchronously Reading Multiple Files

\\n\\n

For scenarios requiring simultaneous reading of multiple files, aiofiles provides an file I/O asynchronous version.

\\n\\n

Example

\\n\\n
# File path: scripts/async_file_reader.py\\n\\nimport asyncio\\n\\nimport aiofiles  # pip install aiofiles --break-system-packages\\n\\nimport os\\n\\nasync def read_file(file_path: str) -> dict:\\n    """Asynchronously read a single file"""\\n\\n    try:\\n\\n        async with aiofiles.open(file_path, encoding="utf-8") as f:\\n\\n            content = await f.read()\\n\\n            return {\\n\\n                "file": os.path.basename(file_path),\\n\\n                "chars": len(content),\\n\\n                "lines": content.count("n"),\\n\\n                "status": "ok"\\n\\n            }\\n\\n    except Exception as e:\\n\\n        return {"file": file_path, "status": "error", "message": str(e)}\\n\\nasync def read_all_files(file_paths: list) -> list:\\n    """Concurrently read all files"""\\n\\n    tasks = [read_file(fp) for fp in file_paths]\\n\\n    return await asyncio.gather(*tasks)\\n\\nif __name__ == "__main__":\\n\\n    import glob, json\\n\\n    # Read all .txt files in the upload directory\\n\\n    files = glob.glob("/mnt/user-data/uploads/*.txt")\\n\\n    if not files:\\n\\n        print("Not found .txt Files")\\n\\n    else:\\n\\n        results = asyncio.run(read_all_files(files))\\n\\n        print(json.dumps(results, ensure_ascii=False, indent=2))\\n
\\n\\n
\\n\\n

Timeout Control and Task Cancellation

\\n\\n

In asynchronous tasks, timeout control is particularly important. asyncio.wait_for can set a maximum waiting time for a single task.

\\n\\n

Example

\\n\\n
# File path: scripts/async_timeout.py\\n\\nimport asyncio\\n\\nasync def slow_task(name: str, delay: float) -> str:\\n    """Simulate a time-consuming task"""\\n\\n    await asyncio.sleep(delay)\\n\\n    return f"{name} Completed"\\n\\nasync def run_with_timeout():\\n\\n    tasks = {\\n\\n        "Fast task": slow_task("Fast task", 0.5),\\n\\n        "Slow task": slow_task("Slow task", 5.0),\\n\\n    }\\n\\n    results = {}\\n\\n    for name, coro in tasks.items():\\n\\n        try:\\n\\n            # Each task waits at most 2 seconds\\n\\n            result = await asyncio.wait_for(coro, timeout=2.0)\\n\\n            results = {"status": "ok", "result": result}\\n\\n        except asyncio.TimeoutError:\\n\\n            results = {"status": "timeout", "message": "Not completed after 2 seconds"}\\n\\n    return results\\n\\nif __name__ == "__main__":\\n\\n    import json\\n\\n    output = asyncio.run(run_with_timeout())\\n\\n    print(json.dumps(output, ensure_ascii=False, indent=2))\\n
\\n\\n
{ "Fast task": {"status": "ok", "result": "Fast task Completed"}, "Slow task": {"status": "timeout", "message": "Not completed after 2 seconds"}}\\n
\\n\\n
\\n\\n

Documenting Async Script Usage in SKILL.md

\\n\\n

Concurrent API Calls

\\n\\n

When users provide multiple texts that need simultaneous analysis, run the asynchronous script to improve speed:

\\n\\n
python scripts/async_requests.py\\n
\\n\\n

This script will concurrently issue all requests, with total time approximately equal to the slowest single request, rather than the sum of all request times.

← Skills MonitoringVue3 Blog Deploy Vercel β†’