Skills Performance
When Skills process large files or are called frequently, performance issues can directly affect user experience.
This article introduces common techniques to improve processing speed at the script level.
* * *
## Common Sources of Performance Issues
| Source | Typical Symptoms | Optimization Direction |
| --- | --- | --- |
| Loading large files all at once | Reading 100MB CSV takes several seconds | Chunked reading (chunk) |
| Repeatedly executing the same calculations | Recalculating statistics on every call | Cache results |
| Single-threaded serial processing | Processing 1000 files takes a long time | Parallel processing |
| Frequent disk I/O | Writing files line by line in a loop | Batch writing |
| Unnecessary dependency loading | import takes more than 2 seconds | Lazy import |
* * *
## Chunked Reading of Large Files
The chunksize parameter in pandas allows splitting large files into batches for incremental processing, avoiding excessive memory usage at once.
## Example
# File path: scripts/chunked_reader.py
import pandas as pd
import json
import sys
def process_large_csv(file_path: str, chunk_size: int=10000) ->dict:
"""
Chunked reading and processing of large CSV files
Parameters:
file_path: CSV file path
chunk_size: Number of rows per chunk, default 10000
Returns:
Merged statistical results
"""
total_rows =0
total_sum =0.0
chunk_count =0
# Read in chunks, without loading the entire file into memory at once
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
chunk_count +=1
total_rows +=len(chunk)
# Perform statistics on each chunk, accumulate results
if"score"in chunk.columns:
total_sum +=chunk.sum()
# Real-time progress reporting
print(f" Processed batch {chunk_count}, accumulated {total_rows} rows", flush=True)
avg = total_sum / total_rows if total_rows >0 else 0
return{
"status": "success",
"total_rows": total_rows,
"chunks": chunk_count,
"score_avg": round(avg,2)
}
if __name__ =="__main__":
file_path =sys.argvif len(sys.argv)>1 else""
result = process_large_csv(file_path)
print(json.dumps(result, ensure_ascii=False, indent=2))
Processed batch 1, accumulated 10000 rows Processed batch 2, accumulated 20000 rows Processed batch 3, accumulated 28456 rows{ "status": "success", "total_rows": 28456, "chunks": 3, "score_avg": 82.37}
* * *
## Result Caching
For operations where the same input produces the same result, cache the results to a local file to avoid redundant calculations.
## Example
# File path: scripts/file_cache.py
import hashlib
import json
import os
import time
CACHE_DIR ="/home/claude/.skill_cache"
os.makedirs(CACHE_DIR, exist_ok=True)
def _file_fingerprint(file_path: str) ->str:
"""
Generate a fingerprint based on file path, size, and modification time
Much faster than MD5 file content, suitable for large files
"""
stat=os.stat(file_path)
raw = f"{file_path}|{stat.st_size}|{stat.st_mtime}"
return hashlib.md5(raw.encode()).hexdigest()
def get_cached(file_path: str, operation: str):
"""Get cache, return None if not exists"""
key = _file_fingerprint(file_path) + "_" + operation
cache_file =os.path.join(CACHE_DIR, f"{key}.json")
if os.path.exists(cache_file):
with open(cache_file)as f:
return json.load(f)
return None
def set_cached(file_path: str, operation: str, result: dict):
"""Write to cache"""
key = _file_fingerprint(file_path) + "_" + operation
cache_file =os.path.join(CACHE_DIR, f"{key}.json")
with open(cache_file,"w")as f:
json.dump(result, f, ensure_ascii=False)
# Usage example
def get_stats(file_path: str) ->dict:
# Check cache first
cached = get_cached(file_path,"stats")
if cached:
print("Cache hit, skipping redundant calculation")
return cached
# Cache miss, perform calculation
import pandas as pd
t0 =time.time()
df = pd.read_csv(file_path)
result ={
"rows": len(df),
"cols": len(df.columns),
"elapsed": round(time.time() - t0,3)
}
# Write to cache for next use
set_cached(file_path,"stats", result)
return result
* * *
## Parallel Processing of Multiple Files
When multiple files need to be processed, use concurrent.futures for parallel execution, which can improve speed several times over.
## Example
# File path: scripts/parallel_process.py
import concurrent.futures
import os
import json
import sys
def process_single_file(file_path: str) ->dict:
"""Process a single file (will be called in parallel)"""
try:
size =os.path.getsize(file_path)
# Simulate actual processing logic
return{"file": os.path.basename(file_path),
"size_kb": size // 1024,"status": "ok"}
except Exception as e:
return{"file": file_path,"status": "error","message": str(e)}
def process_files_parallel(file_paths: list, max_workers: int=4) ->list:
"""
Process multiple files in parallel
Parameters:
file_paths: List of file paths
max_workers: Maximum concurrent workers, default 4 (to avoid too many processes competing for resources)
"""
results =[]
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)as executor:
# Submit all tasks
future_map ={
executor.submit(process_single_file, fp): fp
for fp in file_paths
}
# Collect results in order of completion
for future in concurrent.futures.as_completed(future_map):
result = future.result()
results.append(result)
print(f" Completed: {result['file']}")
return results
if __name__ =="__main__":
# Simulate: Process all CSV files in the upload directory
upload_dir ="/mnt/user-data/uploads"
csv_files =[
os.path.join(upload_dir, f)
for f in os.listdir(upload_dir)
if f.endswith(".csv")
]
if not csv_files:
print("No CSV files found")
sys.exit(0)
print(f"Starting parallel processing of {len(csv_files)} files...")
results = process_files_parallel(csv_files)
print(json.dumps(results, ensure_ascii=False, indent=2))
Starting parallel processing of 3 files... Completed: tutorial_jan.csv Completed: tutorial_mar.csv Completed: tutorial_feb.csv [{"file": "tutorial_jan.csv", "size_kb": 128, "status": "ok"}, ...]
* * *
## Performance Optimization Checklist
| Check Item | Before Optimization | After Optimization |
| --- | --- | --- |
| Reading large files | pd.read_csv(file) | pd.read_csv(file, chunksize=10000) |
| Repeatedly calculating statistics on same file | Recalculate every time | Cache results using file fingerprint |
| Processing multiple files | for loop serial | ThreadPoolExecutor parallel |
| Writing file line by line | for row: f.write(row) | Collect in
YouTip