Rate limiting is a critical component of API design that controls how frequently clients can make requests to your API within a specific time window. It protects backend services from abuse, ensures fair resource allocation, and maintains system stability.
Modern rate limiting implementations typically use one of these algorithms:
This approach divides time into fixed intervals (e.g., 60 seconds) and counts requests within each window. Simple to implement but can allow bursts at window boundaries.
from datetime import datetime, timedelta
class FixedWindowRateLimiter:
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window = timedelta(seconds=window_seconds)
self.counters = {}
def check_limit(self, client_id):
now = datetime.now()
window_start = now - self.window
if client_id not in self.counters or self.counters[client_id]['window_start'] < window_start:
self.counters[client_id] = {'count': 1, 'window_start': now}
return True
if self.counters[client_id]['count'] >= self.max_requests:
return False
self.counters[client_id]['count'] += 1
return True
More precise than fixed window, this approach tracks requests in a rolling time window using timestamps.
import time
from collections import deque
class SlidingWindowRateLimiter:
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = {}
def check_limit(self, client_id):
current_time = time.time()
if client_id not in self.requests:
self.requests[client_id] = deque()
# Remove outdated requests
while (len(self.requests[client_id]) > 0 and
current_time - self.requests[client_id][0] > self.window_seconds):
self.requests[client_id].popleft()
if len(self.requests[client_id]) >= self.max_requests:
return False
self.requests[client_id].append(current_time)
return True
This approach allows for burstable traffic while maintaining a long-term average rate.
import time
class TokenBucket:
def __init__(self, capacity, fill_rate):
self.capacity = float(capacity)
self.tokens = float(capacity)
self.fill_rate = float(fill_rate)
self.last_time = time.time()
def consume(self, tokens=1):
now = time.time()
elapsed = now - self.last_time
self.last_time = now
# Add new tokens
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.fill_rate
)
# Check if enough tokens available
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
For microservices architectures, you need distributed rate limiting. Redis is commonly used for this purpose.
import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0)
def check_rate_limit(client_id, max_requests, window_seconds):
key = f"rate_limit:{client_id}"
current = r.get(key)
if current and int(current) >= max_requests:
return False
pipe = r.pipeline()
pipe.incr(key)
pipe.expire(key, window_seconds)
pipe.execute()
return True
Follow IETF standards (RFC 6585) for communicating rate limits to clients:
from flask import Flask, jsonify, make_response
app = Flask(__name__)
@app.route('/api')
def api_endpoint():
client_id = request.headers.get('X-Client-ID')
if not check_rate_limit(client_id, 100, 60):
response = make_response(jsonify({"error": "Rate limit exceeded"}), 429)
response.headers['X-RateLimit-Limit'] = '100'
response.headers['X-RateLimit-Remaining'] = '0'
response.headers['X-RateLimit-Reset'] = str(int(time.time()) + 60)
return response
remaining = calculate_remaining_requests(client_id)
response = jsonify({"data": "Your API response"})
response.headers['X-RateLimit-Limit'] = '100'
response.headers['X-RateLimit-Remaining'] = str(remaining)
response.headers['X-RateLimit-Reset'] = str(int(time.time()) + 60)
return response
Consider:
When clients hit limits, suggest appropriate retry times:
def calculate_retry_after(client_id):
reset_time = get_reset_time(client_id)
return max(1, int(reset_time - time.time()))
Track metrics like:
Implement both short-term (per-second) and long-term (per-hour) limits:
class MultiWindowRateLimiter:
def __init__(self, limits):
# limits = [ (max_requests, window_seconds), ... ]
self.limiters = [SlidingWindowRateLimiter(m, w) for m, w in limits]
def check_limit(self, client_id):
return all(limiter.check_limit(client_id) for limiter in self.limiters)
For performance, cache rate limit decisions when possible:
from functools import lru_cache
@lru_cache(maxsize=10000)
def check_rate_limit_cached(client_id):
return check_rate_limit(client_id, 100, 60)
Adjust limits based on system health:
def adaptive_rate_limit(client_id):
system_load = get_system_load()
base_limit = 100
if system_load > 0.8:
return base_limit * 0.5
elif system_load > 0.9:
return base_limit * 0.2
else:
return base_limit
Store client-specific limits in a database:
def get_client_limit(client_id):
# Fetch from database or cache
default = {'limit': 100, 'window': 60}
return db.get_client_config(client_id) or default
Consider implementing rate limiting at multiple levels:
# AWS SAM template
Resources:
MyApi:
Type: AWS::Serverless::Api
Properties:
StageName: Prod
UsagePlan:
CreateUsagePlan: PER_API
UsagePlanName: MyUsagePlan
Quota:
Limit: 1000
Period: DAY
Throttle:
BurstLimit: 100
RateLimit: 50
# openapi.yaml
x-google-management:
metrics:
- name: "read-requests"
displayName: "Read requests"
valueType: INT64
metricKind: DELTA
quota:
limits:
- name: "read-limit"
metric: "read-requests"
unit: "1/min/{project}"
values:
STANDARD: 1000
<!-- policy.xml -->
<policies>
<inbound>
<rate-limit calls="100" renewal-period="60" />
<base />
</inbound>
<outbound>
<base />
</outbound>
</policies>
Verify your implementation with unit tests:
import unittest
from time import sleep
class TestRateLimiter(unittest.TestCase):
def setUp(self):
self.limiter = SlidingWindowRateLimiter(5, 1)
def test_under_limit(self):
for _ in range(5):
self.assertTrue(self.limiter.check_limit("client1"))
def test_over_limit(self):
for _ in range(5):
self.limiter.check_limit("client2")
self.assertFalse(self.limiter.check_limit("client2"))
def test_window_reset(self):
for _ in range(5):
self.limiter.check_limit("client3")
sleep(1)
self.assertTrue(self.limiter.check_limit("client3"))
# Using a bloom filter for high-cardinality detection
from pybloom_live import ScalableBloomFilter
class BloomRateLimiter:
def __init__(self, capacity, error_rate):
self.filter = ScalableBloomFilter(capacity, error_rate)
def check_request(self, request_id):
if request_id in self.filter:
return False
self.filter.add(request_id)
return True
Always remember that rate limiting is as much about user experience as it is about system protection. Clear communication of limits and graceful handling of exceeded limits will result in better developer adoption of your API.