YouTip LogoYouTip

Skills Performance

When Skills process large files or are called frequently, performance issues can directly affect user experience. This article introduces common techniques to improve processing speed at the script level. * * * ## Common Sources of Performance Issues | Source | Typical Symptoms | Optimization Direction | | --- | --- | --- | | Loading large files all at once | Reading 100MB CSV takes several seconds | Chunked reading (chunk) | | Repeatedly executing the same calculations | Recalculating statistics on every call | Cache results | | Single-threaded serial processing | Processing 1000 files takes a long time | Parallel processing | | Frequent disk I/O | Writing files line by line in a loop | Batch writing | | Unnecessary dependency loading | import takes more than 2 seconds | Lazy import | * * * ## Chunked Reading of Large Files The chunksize parameter in pandas allows splitting large files into batches for incremental processing, avoiding excessive memory usage at once. ## Example # File path: scripts/chunked_reader.py import pandas as pd import json import sys def process_large_csv(file_path: str, chunk_size: int=10000) ->dict: """ Chunked reading and processing of large CSV files Parameters: file_path: CSV file path chunk_size: Number of rows per chunk, default 10000 Returns: Merged statistical results """ total_rows =0 total_sum =0.0 chunk_count =0 # Read in chunks, without loading the entire file into memory at once for chunk in pd.read_csv(file_path, chunksize=chunk_size): chunk_count +=1 total_rows +=len(chunk) # Perform statistics on each chunk, accumulate results if"score"in chunk.columns: total_sum +=chunk.sum() # Real-time progress reporting print(f" Processed batch {chunk_count}, accumulated {total_rows} rows", flush=True) avg = total_sum / total_rows if total_rows >0 else 0 return{ "status": "success", "total_rows": total_rows, "chunks": chunk_count, "score_avg": round(avg,2) } if __name__ =="__main__": file_path =sys.argvif len(sys.argv)>1 else"" result = process_large_csv(file_path) print(json.dumps(result, ensure_ascii=False, indent=2)) Processed batch 1, accumulated 10000 rows Processed batch 2, accumulated 20000 rows Processed batch 3, accumulated 28456 rows{ "status": "success", "total_rows": 28456, "chunks": 3, "score_avg": 82.37} * * * ## Result Caching For operations where the same input produces the same result, cache the results to a local file to avoid redundant calculations. ## Example # File path: scripts/file_cache.py import hashlib import json import os import time CACHE_DIR ="/home/claude/.skill_cache" os.makedirs(CACHE_DIR, exist_ok=True) def _file_fingerprint(file_path: str) ->str: """ Generate a fingerprint based on file path, size, and modification time Much faster than MD5 file content, suitable for large files """ stat=os.stat(file_path) raw = f"{file_path}|{stat.st_size}|{stat.st_mtime}" return hashlib.md5(raw.encode()).hexdigest() def get_cached(file_path: str, operation: str): """Get cache, return None if not exists""" key = _file_fingerprint(file_path) + "_" + operation cache_file =os.path.join(CACHE_DIR, f"{key}.json") if os.path.exists(cache_file): with open(cache_file)as f: return json.load(f) return None def set_cached(file_path: str, operation: str, result: dict): """Write to cache""" key = _file_fingerprint(file_path) + "_" + operation cache_file =os.path.join(CACHE_DIR, f"{key}.json") with open(cache_file,"w")as f: json.dump(result, f, ensure_ascii=False) # Usage example def get_stats(file_path: str) ->dict: # Check cache first cached = get_cached(file_path,"stats") if cached: print("Cache hit, skipping redundant calculation") return cached # Cache miss, perform calculation import pandas as pd t0 =time.time() df = pd.read_csv(file_path) result ={ "rows": len(df), "cols": len(df.columns), "elapsed": round(time.time() - t0,3) } # Write to cache for next use set_cached(file_path,"stats", result) return result * * * ## Parallel Processing of Multiple Files When multiple files need to be processed, use concurrent.futures for parallel execution, which can improve speed several times over. ## Example # File path: scripts/parallel_process.py import concurrent.futures import os import json import sys def process_single_file(file_path: str) ->dict: """Process a single file (will be called in parallel)""" try: size =os.path.getsize(file_path) # Simulate actual processing logic return{"file": os.path.basename(file_path), "size_kb": size // 1024,"status": "ok"} except Exception as e: return{"file": file_path,"status": "error","message": str(e)} def process_files_parallel(file_paths: list, max_workers: int=4) ->list: """ Process multiple files in parallel Parameters: file_paths: List of file paths max_workers: Maximum concurrent workers, default 4 (to avoid too many processes competing for resources) """ results =[] with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)as executor: # Submit all tasks future_map ={ executor.submit(process_single_file, fp): fp for fp in file_paths } # Collect results in order of completion for future in concurrent.futures.as_completed(future_map): result = future.result() results.append(result) print(f" Completed: {result['file']}") return results if __name__ =="__main__": # Simulate: Process all CSV files in the upload directory upload_dir ="/mnt/user-data/uploads" csv_files =[ os.path.join(upload_dir, f) for f in os.listdir(upload_dir) if f.endswith(".csv") ] if not csv_files: print("No CSV files found") sys.exit(0) print(f"Starting parallel processing of {len(csv_files)} files...") results = process_files_parallel(csv_files) print(json.dumps(results, ensure_ascii=False, indent=2)) Starting parallel processing of 3 files... Completed: tutorial_jan.csv Completed: tutorial_mar.csv Completed: tutorial_feb.csv [{"file": "tutorial_jan.csv", "size_kb": 128, "status": "ok"}, ...] * * * ## Performance Optimization Checklist | Check Item | Before Optimization | After Optimization | | --- | --- | --- | | Reading large files | pd.read_csv(file) | pd.read_csv(file, chunksize=10000) | | Repeatedly calculating statistics on same file | Recalculate every time | Cache results using file fingerprint | | Processing multiple files | for loop serial | ThreadPoolExecutor parallel | | Writing file line by line | for row: f.write(row) | Collect in
← Skills PublishSkills Ecosystem β†’