Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """Demo script to verify rate limiting works correctly.""" | |
| import asyncio | |
| import time | |
| from src.tools.pubmed import PubMedTool | |
| from src.tools.rate_limiter import RateLimiter, get_pubmed_limiter, reset_pubmed_limiter | |
| async def test_basic_limiter(): | |
| """Test basic rate limiter behavior.""" | |
| print("=" * 60) | |
| print("Rate Limiting Demo") | |
| print("=" * 60) | |
| # Test 1: Basic limiter | |
| print("\n[Test 1] Testing 3/second limiter...") | |
| limiter = RateLimiter("3/second") | |
| start = time.monotonic() | |
| for i in range(6): | |
| await limiter.acquire() | |
| elapsed = time.monotonic() - start | |
| print(f" Request {i + 1} at {elapsed:.2f}s") | |
| total = time.monotonic() - start | |
| print(f" Total time for 6 requests: {total:.2f}s (expected ~2s)") | |
| async def test_pubmed_limiter(): | |
| """Test PubMed-specific limiter.""" | |
| print("\n[Test 2] Testing PubMed limiter (shared)...") | |
| reset_pubmed_limiter() # Clean state | |
| # Without API key: 3/sec | |
| limiter = get_pubmed_limiter(api_key=None) | |
| print(f" Rate without key: {limiter.rate}") | |
| # Multiple tools should share the same limiter | |
| tool1 = PubMedTool() | |
| tool2 = PubMedTool() | |
| # Verify they share the limiter | |
| print(f" Tools share limiter: {tool1._limiter is tool2._limiter}") | |
| async def test_concurrent_requests(): | |
| """Test rate limiting under concurrent load.""" | |
| print("\n[Test 3] Testing concurrent request limiting...") | |
| limiter = RateLimiter("5/second") | |
| async def make_request(i: int): | |
| await limiter.acquire() | |
| return time.monotonic() | |
| start = time.monotonic() | |
| # Launch 10 concurrent requests | |
| tasks = [make_request(i) for i in range(10)] | |
| times = await asyncio.gather(*tasks) | |
| # Calculate distribution | |
| relative_times = [t - start for t in times] | |
| print(f" Request times: {[f'{t:.2f}s' for t in sorted(relative_times)]}") | |
| total = max(relative_times) | |
| print(f" All 10 requests completed in {total:.2f}s (expected ~2s)") | |
| async def main(): | |
| await test_basic_limiter() | |
| await test_pubmed_limiter() | |
| await test_concurrent_requests() | |
| print("\n" + "=" * 60) | |
| print("Demo complete!") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |