API performance directly impacts user experience, system scalability, and operational costs. In modern distributed systems, APIs serve as the primary interface between services, making their optimization critical for overall system health. Performance optimization involves reducing latency, increasing throughput, and improving resource utilization while maintaining functionality and reliability.
Key performance metrics include:
Implement proper HTTP caching headers to reduce unnecessary server requests. Use ETags and Last-Modified headers for conditional requests.
GET /api/users/123
HTTP/1.1 200 OK
ETag: "33a64df551425fcc55e4d42a148795d9f25f89d4"
Cache-Control: max-age=3600
Last-Modified: Wed, 21 Oct 2015 07:28:00 GMT
// Client implementation with conditional requests
async function fetchUser(userId, lastETag = null) {
const headers = {};
if (lastETag) {
headers['If-None-Match'] = lastETag;
}
const response = await fetch(`/api/users/${userId}`, { headers });
if (response.status === 304) {
// Data hasn't changed, use cached version
return cachedData;
}
const newETag = response.headers.get('ETag');
const data = await response.json();
// Store in cache with new ETag
cache.set(userId, { data, eTag: newETag });
return data;
}
Implement in-memory caching for frequently accessed data using Redis or Memcached.
from redis import Redis
import json
import hashlib
class APICache:
def __init__(self, redis_client: Redis, default_ttl=300):
self.redis = redis_client
self.default_ttl = default_ttl
def get_cache_key(self, endpoint: str, params: dict) -> str:
param_str = json.dumps(params, sort_keys=True)
hash_key = hashlib.md5(param_str.encode()).hexdigest()
return f"api:{endpoint}:{hash_key}"
async def get_cached_response(self, endpoint: str, params: dict):
cache_key = self.get_cache_key(endpoint, params)
cached = await self.redis.get(cache_key)
if cached:
return json.loads(cached)
return None
async def set_cached_response(self, endpoint: str, params: dict, data: any, ttl: int = None):
cache_key = self.get_cache_key(endpoint, params)
ttl = ttl or self.default_ttl
await self.redis.setex(
cache_key,
ttl,
json.dumps(data, default=str)
)
Optimize database performance with query caching and proper indexing.
-- Example of optimized query with proper indexing
CREATE INDEX idx_users_active_created ON users(created_at) WHERE active = true;
-- Cached query result
SELECT user_id, username, email
FROM users
WHERE active = true
AND created_at >= '2024-01-01'
ORDER BY created_at DESC
LIMIT 100;
Implement efficient database connection management to reduce connection overhead.
// HikariCP configuration for PostgreSQL
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://localhost:5432/api_db");
config.setUsername("api_user");
config.setPassword("password");
config.setMaximumPoolSize(20);
config.setMinimumIdle(5);
config.setConnectionTimeout(30000);
config.setIdleTimeout(600000);
config.setMaxLifetime(1800000);
// Connection usage pattern
try (Connection connection = dataSource.getConnection();
PreparedStatement stmt = connection.prepareStatement(
"SELECT * FROM orders WHERE user_id = ? AND status = ?")) {
stmt.setInt(1, userId);
stmt.setString(2, "completed");
try (ResultSet rs = stmt.executeQuery()) {
// Process results
}
}
Use efficient query patterns and avoid N+1 query problems.
# Bad: N+1 queries
async def get_user_orders_naive(user_ids: List[int]):
orders = []
for user_id in user_ids:
user_orders = await db.fetch(
"SELECT * FROM orders WHERE user_id = $1",
user_id
)
orders.extend(user_orders)
return orders
# Good: Single query with IN clause
async def get_user_orders_optimized(user_ids: List[int]):
placeholders = ','.join(['$' + str(i+1) for i in range(len(user_ids))])
query = f"""
SELECT o.*, u.username
FROM orders o
JOIN users u ON o.user_id = u.id
WHERE o.user_id IN ({placeholders})
ORDER BY o.created_at DESC
"""
return await db.fetch(query, *user_ids)
Implement strategic indexing based on query patterns.
-- Composite index for common query patterns
CREATE INDEX idx_orders_user_status_date
ON orders(user_id, status, created_at DESC);
-- Partial index for frequently filtered data
CREATE INDEX idx_orders_active_recent
ON orders(created_at)
WHERE status IN ('pending', 'processing');
-- Covering index to avoid table scans
CREATE INDEX idx_users_profile_cover
ON users(id, username, email, created_at);
Implement gzip or brotli compression for large responses.
# Nginx configuration for compression
gzip on;
gzip_vary on;
gzip_min_length 1024;
gzip_proxied any;
gzip_comp_level 6;
gzip_types
application/atom+xml
application/json
application/ld+json
application/manifest+json
application/rss+xml
application/vnd.geo+json
application/xml
text/css
text/plain
text/xml;
Implement efficient pagination and field selection.
// GraphQL-style field selection implementation
class ResponseOptimizer {
static selectFields(data: any, fields: string[]): any {
if (Array.isArray(data)) {
return data.map(item => this.selectFields(item, fields));
}
if (typeof data === 'object' && data !== null) {
const result: any = {};
for (const field of fields) {
if (field in data) {
result[field] = data[field];
}
}
return result;
}
return data;
}
}
// Usage example
const fullUserData = {
id: 1,
username: 'john_doe',
email: 'john@example.com',
profile: { /* large profile object */ },
preferences: { /* large preferences object */ }
};
// Client requests only needed fields
const optimizedResponse = ResponseOptimizer.selectFields(
fullUserData,
['id', 'username', 'email']
);
Consider using binary protocols like Protocol Buffers for high-throughput APIs.
syntax = "proto3";
message User {
int32 id = 1;
string username = 2;
string email = 3;
int64 created_at = 4;
UserProfile profile = 5;
}
message UserProfile {
string first_name = 1;
string last_name = 2;
string avatar_url = 3;
}
message GetUsersRequest {
repeated int32 user_ids = 1;
repeated string fields = 2;
}
message GetUsersResponse {
repeated User users = 1;
}
# Protocol Buffers implementation
import user_pb2
def serialize_users(users_data):
response = user_pb2.GetUsersResponse()
for user_data in users_data:
user_proto = response.users.add()
user_proto.id = user_data['id']
user_proto.username = user_data['username']
# ... set other fields
return response.SerializeToString()
Implement non-blocking I/O operations using async/await patterns.
from aiohttp import ClientSession
import asyncio
from databases import Database
class ConcurrentAPI:
def __init__(self, db: Database):
self.db = db
async def fetch_user_data(self, user_id: int):
# Execute multiple independent queries concurrently
user_query = "SELECT * FROM users WHERE id = :user_id"
orders_query = "SELECT * FROM orders WHERE user_id = :user_id"
user, orders = await asyncio.gather(
self.db.fetch_one(user_query, {"user_id": user_id}),
self.db.fetch_all(orders_query, {"user_id": user_id})
)
return {"user": user, "orders": orders}
async def batch_process_users(self, user_ids: List[int]):
# Process multiple users in parallel with semaphore for rate limiting
semaphore = asyncio.Semaphore(10) # Limit to 10 concurrent requests
async def process_with_limit(user_id):
async with semaphore:
return await self.fetch_user_data(user_id)
tasks = [process_with_limit(uid) for uid in user_ids]
return await asyncio.gather(*tasks, return_exceptions=True)
Use HTTP/2 for connection multiplexing and header compression.
# HTTP/2 configuration
server {
listen 443 ssl http2;
server_name api.example.com;
ssl_certificate /path/to/cert.pem;
ssl_certificate_key /path/to/private.key;
# HTTP/2 specific optimizations
http2_max_concurrent_streams 100;
http2_max_field_size 16k;
http2_max_header_size 32k;
location /api/ {
proxy_pass http://backend_api;
proxy_http_version 1.1;
proxy_set_header Connection "";
}
}
Implement comprehensive metrics collection using Prometheus or similar tools.
from prometheus_client import Counter, Histogram, generate_latest
import time
from flask import Flask, Response
app = Flask(__name__)
# Define metrics
REQUEST_COUNT = Counter(
'api_requests_total',
'Total API requests',
['method', 'endpoint', 'status_code']
)
REQUEST_DURATION = Histogram(
'api_request_duration_seconds',
'API request duration in seconds',
['method', 'endpoint']
)
@app.before_request
def before_request():
request.start_time = time.time()
@app.after_request
def after_request(response):
duration = time.time() - request.start_time
REQUEST_DURATION.labels(
method=request.method,
endpoint=request.endpoint
).observe(duration)
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.endpoint,
status_code=response.status_code
).inc()
return response
@app.route('/metrics')
def metrics():
return Response(generate_latest(), mimetype='text/plain')
Implement distributed tracing for microservices architectures.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger import JaegerSpanExporter
# Tracing setup
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
jaeger_exporter = JaegerSpanExporter(
agent_host_name="localhost",
agent_port=6831,
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
# Usage in API endpoints
@app.route('/api/users/<user_id>')
def get_user(user_id):
with tracer.start_as_current_span("get_user") as span:
span.set_attribute("user.id", user_id)
# Database query with tracing
with tracer.start_as_current_span("db.query"):
user = db.get_user(user_id)
# External API call with tracing
with tracer.start_as_current_span("external.api.call"):
profile_data = external_api.get_profile(user_id)
return jsonify({**user, **profile_data})
Implement automated performance testing using tools like k6 or Artillery.
// k6 load testing script
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate, Trend } from 'k6/metrics';
const errorRate = new Rate('errors');
const requestDuration = new Trend('request_duration');
export const options = {
stages: [
{ duration: '2m', target: 100 }, // Ramp up to 100 users
{ duration: '5m', target: 100 }, // Stay at 100 users
{ duration: '2m', target: 200 }, // Ramp up to 200 users
{ duration: '5m', target: 200 }, // Stay at 200 users
{ duration: '2m', target: 0 }, // Ramp down to 0 users
],
thresholds: {
errors: ['rate<0.01'], // Error rate < 1%
request_duration: ['p(95)<500'], // 95% of requests < 500ms
},
};
export default function () {
const params = {
headers: {
'Authorization': 'Bearer ' + __ENV.API_TOKEN,
'Content-Type': 'application/json',
},
};
// Test user retrieval endpoint
const userResponse = http.get(
`https://api.example.com/users/${Math.floor(Math.random() * 1000)}`,
params
);
const userCheck = check(userResponse, {
'user response status is 200': (r) => r.status === 200,
'user response time < 500ms': (r) => r.timings.duration < 500,
});
errorRate.add(!userCheck);
requestDuration.add(userResponse.timings.duration);
sleep(1);
}
Implement CDN caching for static assets and API responses.
# CDN-friendly caching configuration
location /api/static/ {
# Cache static API responses in CDN
add_header Cache-Control "public, max-age=3600";
add_header CDN-Cache-Control "public, max-age=7200";
expires 1h;
# Serve stale content on origin failure
proxy_cache_use_stale error timeout updating;
proxy_cache_background_update on;
}
location /api/dynamic/ {
# Shorter cache for dynamic content
add_header Cache-Control "public, max-age=60";
add_header CDN-Cache-Control "public, max-age=120";
# Cache variations by headers
proxy_cache_key "$scheme$request_method$host$request_uri$http_authorization";
}
Configure auto-scaling based on performance metrics.
# Kubernetes Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: api-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: api-service
minReplicas: 2
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Pods
pods:
metric:
name: api_requests_per_second
target:
type: AverageValue
averageValue: "100"
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
Implement read replicas for read-heavy workloads.
class ReadReplicaRouter:
"""Route read queries to replicas, writes to primary"""
def db_for_read(self, model, **hints):
if hints.get('force_primary', False):
return 'primary'
# Route to random replica for load distribution
return random.choice(['replica1', 'replica2', 'replica3'])
def db_for_write(self, model, **hints):
return 'primary'
def allow_relation(self, obj1, obj2, **hints):
# Allow relations between objects in different databases
return True
def allow_migrate(self, db, app_label, model_name=None, **hints):
# Only allow migrations on primary database
return db == 'primary'
Implement sophisticated rate limiting to prevent abuse.
package main
import (
"net/http"
"sync"
"time"
)
type RateLimiter struct {
requests map[string][]time.Time
mutex sync.RWMutex
limit int
window time.Duration
}
func NewRateLimiter(limit int, window time.Duration) *RateLimiter {
return &RateLimiter{
requests: make(map[string][]time.Time),
limit: limit,
window: window,
}
}
func (rl *RateLimiter