Back to Blog
API versioning
API best practices
API gateway

API Performance Optimization Techniques

9 min read
K
Kevin
API Security Specialist

API Performance Optimization Techniques

Introduction to API Performance

API performance directly impacts user experience, system scalability, and operational costs. In modern distributed systems, APIs serve as the primary interface between services, making their optimization critical for overall system health. Performance optimization involves reducing latency, increasing throughput, and improving resource utilization while maintaining functionality and reliability.

Key performance metrics include:

  • Response time (latency)
  • Requests per second (throughput)
  • Error rates
  • Resource consumption (CPU, memory, network)

Caching Strategies

Client-Side Caching

Implement proper HTTP caching headers to reduce unnecessary server requests. Use ETags and Last-Modified headers for conditional requests.

GET /api/users/123
HTTP/1.1 200 OK
ETag: "33a64df551425fcc55e4d42a148795d9f25f89d4"
Cache-Control: max-age=3600
Last-Modified: Wed, 21 Oct 2015 07:28:00 GMT
// Client implementation with conditional requests
async function fetchUser(userId, lastETag = null) {
    const headers = {};
    if (lastETag) {
        headers['If-None-Match'] = lastETag;
    }
    
    const response = await fetch(`/api/users/${userId}`, { headers });
    
    if (response.status === 304) {
        // Data hasn't changed, use cached version
        return cachedData;
    }
    
    const newETag = response.headers.get('ETag');
    const data = await response.json();
    
    // Store in cache with new ETag
    cache.set(userId, { data, eTag: newETag });
    return data;
}

Server-Side Caching

Implement in-memory caching for frequently accessed data using Redis or Memcached.

from redis import Redis
import json
import hashlib

class APICache:
    def __init__(self, redis_client: Redis, default_ttl=300):
        self.redis = redis_client
        self.default_ttl = default_ttl
    
    def get_cache_key(self, endpoint: str, params: dict) -> str:
        param_str = json.dumps(params, sort_keys=True)
        hash_key = hashlib.md5(param_str.encode()).hexdigest()
        return f"api:{endpoint}:{hash_key}"
    
    async def get_cached_response(self, endpoint: str, params: dict):
        cache_key = self.get_cache_key(endpoint, params)
        cached = await self.redis.get(cache_key)
        if cached:
            return json.loads(cached)
        return None
    
    async def set_cached_response(self, endpoint: str, params: dict, data: any, ttl: int = None):
        cache_key = self.get_cache_key(endpoint, params)
        ttl = ttl or self.default_ttl
        await self.redis.setex(
            cache_key, 
            ttl, 
            json.dumps(data, default=str)
        )

Database Query Caching

Optimize database performance with query caching and proper indexing.

-- Example of optimized query with proper indexing
CREATE INDEX idx_users_active_created ON users(created_at) WHERE active = true;

-- Cached query result
SELECT user_id, username, email 
FROM users 
WHERE active = true 
AND created_at >= '2024-01-01'
ORDER BY created_at DESC 
LIMIT 100;

Database Optimization

Connection Pooling

Implement efficient database connection management to reduce connection overhead.

// HikariCP configuration for PostgreSQL
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://localhost:5432/api_db");
config.setUsername("api_user");
config.setPassword("password");
config.setMaximumPoolSize(20);
config.setMinimumIdle(5);
config.setConnectionTimeout(30000);
config.setIdleTimeout(600000);
config.setMaxLifetime(1800000);

// Connection usage pattern
try (Connection connection = dataSource.getConnection();
     PreparedStatement stmt = connection.prepareStatement(
         "SELECT * FROM orders WHERE user_id = ? AND status = ?")) {
    
    stmt.setInt(1, userId);
    stmt.setString(2, "completed");
    
    try (ResultSet rs = stmt.executeQuery()) {
        // Process results
    }
}

Query Optimization

Use efficient query patterns and avoid N+1 query problems.

# Bad: N+1 queries
async def get_user_orders_naive(user_ids: List[int]):
    orders = []
    for user_id in user_ids:
        user_orders = await db.fetch(
            "SELECT * FROM orders WHERE user_id = $1", 
            user_id
        )
        orders.extend(user_orders)
    return orders

# Good: Single query with IN clause
async def get_user_orders_optimized(user_ids: List[int]):
    placeholders = ','.join(['$' + str(i+1) for i in range(len(user_ids))])
    query = f"""
        SELECT o.*, u.username 
        FROM orders o
        JOIN users u ON o.user_id = u.id
        WHERE o.user_id IN ({placeholders})
        ORDER BY o.created_at DESC
    """
    return await db.fetch(query, *user_ids)

Indexing Strategy

Implement strategic indexing based on query patterns.

-- Composite index for common query patterns
CREATE INDEX idx_orders_user_status_date 
ON orders(user_id, status, created_at DESC);

-- Partial index for frequently filtered data
CREATE INDEX idx_orders_active_recent 
ON orders(created_at) 
WHERE status IN ('pending', 'processing');

-- Covering index to avoid table scans
CREATE INDEX idx_users_profile_cover 
ON users(id, username, email, created_at);

Payload Optimization

Response Compression

Implement gzip or brotli compression for large responses.

# Nginx configuration for compression
gzip on;
gzip_vary on;
gzip_min_length 1024;
gzip_proxied any;
gzip_comp_level 6;
gzip_types
    application/atom+xml
    application/json
    application/ld+json
    application/manifest+json
    application/rss+xml
    application/vnd.geo+json
    application/xml
    text/css
    text/plain
    text/xml;

Pagination and Partial Responses

Implement efficient pagination and field selection.

// GraphQL-style field selection implementation
class ResponseOptimizer {
    static selectFields(data: any, fields: string[]): any {
        if (Array.isArray(data)) {
            return data.map(item => this.selectFields(item, fields));
        }
        
        if (typeof data === 'object' && data !== null) {
            const result: any = {};
            for (const field of fields) {
                if (field in data) {
                    result[field] = data[field];
                }
            }
            return result;
        }
        
        return data;
    }
}

// Usage example
const fullUserData = {
    id: 1,
    username: 'john_doe',
    email: 'john@example.com',
    profile: { /* large profile object */ },
    preferences: { /* large preferences object */ }
};

// Client requests only needed fields
const optimizedResponse = ResponseOptimizer.selectFields(
    fullUserData, 
    ['id', 'username', 'email']
);

Binary Protocols

Consider using binary protocols like Protocol Buffers for high-throughput APIs.

syntax = "proto3";

message User {
    int32 id = 1;
    string username = 2;
    string email = 3;
    int64 created_at = 4;
    UserProfile profile = 5;
}

message UserProfile {
    string first_name = 1;
    string last_name = 2;
    string avatar_url = 3;
}

message GetUsersRequest {
    repeated int32 user_ids = 1;
    repeated string fields = 2;
}

message GetUsersResponse {
    repeated User users = 1;
}
# Protocol Buffers implementation
import user_pb2

def serialize_users(users_data):
    response = user_pb2.GetUsersResponse()
    for user_data in users_data:
        user_proto = response.users.add()
        user_proto.id = user_data['id']
        user_proto.username = user_data['username']
        # ... set other fields
    return response.SerializeToString()

Concurrency and Parallelism

Async Programming

Implement non-blocking I/O operations using async/await patterns.

from aiohttp import ClientSession
import asyncio
from databases import Database

class ConcurrentAPI:
    def __init__(self, db: Database):
        self.db = db
    
    async def fetch_user_data(self, user_id: int):
        # Execute multiple independent queries concurrently
        user_query = "SELECT * FROM users WHERE id = :user_id"
        orders_query = "SELECT * FROM orders WHERE user_id = :user_id"
        
        user, orders = await asyncio.gather(
            self.db.fetch_one(user_query, {"user_id": user_id}),
            self.db.fetch_all(orders_query, {"user_id": user_id})
        )
        
        return {"user": user, "orders": orders}
    
    async def batch_process_users(self, user_ids: List[int]):
        # Process multiple users in parallel with semaphore for rate limiting
        semaphore = asyncio.Semaphore(10)  # Limit to 10 concurrent requests
        
        async def process_with_limit(user_id):
            async with semaphore:
                return await self.fetch_user_data(user_id)
        
        tasks = [process_with_limit(uid) for uid in user_ids]
        return await asyncio.gather(*tasks, return_exceptions=True)

Connection Multiplexing

Use HTTP/2 for connection multiplexing and header compression.

# HTTP/2 configuration
server {
    listen 443 ssl http2;
    server_name api.example.com;
    
    ssl_certificate /path/to/cert.pem;
    ssl_certificate_key /path/to/private.key;
    
    # HTTP/2 specific optimizations
    http2_max_concurrent_streams 100;
    http2_max_field_size 16k;
    http2_max_header_size 32k;
    
    location /api/ {
        proxy_pass http://backend_api;
        proxy_http_version 1.1;
        proxy_set_header Connection "";
    }
}

Monitoring and Analytics

Performance Metrics Collection

Implement comprehensive metrics collection using Prometheus or similar tools.

from prometheus_client import Counter, Histogram, generate_latest
import time
from flask import Flask, Response

app = Flask(__name__)

# Define metrics
REQUEST_COUNT = Counter(
    'api_requests_total',
    'Total API requests',
    ['method', 'endpoint', 'status_code']
)

REQUEST_DURATION = Histogram(
    'api_request_duration_seconds',
    'API request duration in seconds',
    ['method', 'endpoint']
)

@app.before_request
def before_request():
    request.start_time = time.time()

@app.after_request
def after_request(response):
    duration = time.time() - request.start_time
    REQUEST_DURATION.labels(
        method=request.method,
        endpoint=request.endpoint
    ).observe(duration)
    
    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.endpoint,
        status_code=response.status_code
    ).inc()
    
    return response

@app.route('/metrics')
def metrics():
    return Response(generate_latest(), mimetype='text/plain')

Distributed Tracing

Implement distributed tracing for microservices architectures.

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger import JaegerSpanExporter

# Tracing setup
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

jaeger_exporter = JaegerSpanExporter(
    agent_host_name="localhost",
    agent_port=6831,
)

trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)

# Usage in API endpoints
@app.route('/api/users/<user_id>')
def get_user(user_id):
    with tracer.start_as_current_span("get_user") as span:
        span.set_attribute("user.id", user_id)
        
        # Database query with tracing
        with tracer.start_as_current_span("db.query"):
            user = db.get_user(user_id)
        
        # External API call with tracing
        with tracer.start_as_current_span("external.api.call"):
            profile_data = external_api.get_profile(user_id)
        
        return jsonify({**user, **profile_data})

Load Testing and Benchmarking

Automated Performance Testing

Implement automated performance testing using tools like k6 or Artillery.

// k6 load testing script
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate, Trend } from 'k6/metrics';

const errorRate = new Rate('errors');
const requestDuration = new Trend('request_duration');

export const options = {
    stages: [
        { duration: '2m', target: 100 },  // Ramp up to 100 users
        { duration: '5m', target: 100 },  // Stay at 100 users
        { duration: '2m', target: 200 },  // Ramp up to 200 users
        { duration: '5m', target: 200 },  // Stay at 200 users
        { duration: '2m', target: 0 },    // Ramp down to 0 users
    ],
    thresholds: {
        errors: ['rate<0.01'],           // Error rate < 1%
        request_duration: ['p(95)<500'], // 95% of requests < 500ms
    },
};

export default function () {
    const params = {
        headers: {
            'Authorization': 'Bearer ' + __ENV.API_TOKEN,
            'Content-Type': 'application/json',
        },
    };
    
    // Test user retrieval endpoint
    const userResponse = http.get(
        `https://api.example.com/users/${Math.floor(Math.random() * 1000)}`,
        params
    );
    
    const userCheck = check(userResponse, {
        'user response status is 200': (r) => r.status === 200,
        'user response time < 500ms': (r) => r.timings.duration < 500,
    });
    
    errorRate.add(!userCheck);
    requestDuration.add(userResponse.timings.duration);
    
    sleep(1);
}

Infrastructure Optimization

CDN Integration

Implement CDN caching for static assets and API responses.

# CDN-friendly caching configuration
location /api/static/ {
    # Cache static API responses in CDN
    add_header Cache-Control "public, max-age=3600";
    add_header CDN-Cache-Control "public, max-age=7200";
    expires 1h;
    
    # Serve stale content on origin failure
    proxy_cache_use_stale error timeout updating;
    proxy_cache_background_update on;
}

location /api/dynamic/ {
    # Shorter cache for dynamic content
    add_header Cache-Control "public, max-age=60";
    add_header CDN-Cache-Control "public, max-age=120";
    
    # Cache variations by headers
    proxy_cache_key "$scheme$request_method$host$request_uri$http_authorization";
}

Auto-scaling Configuration

Configure auto-scaling based on performance metrics.

# Kubernetes Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: api-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: api-service
  minReplicas: 2
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Pods
    pods:
      metric:
        name: api_requests_per_second
      target:
        type: AverageValue
        averageValue: "100"
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 100
        periodSeconds: 15
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 10
        periodSeconds: 60

Advanced Optimization Techniques

Database Read Replicas

Implement read replicas for read-heavy workloads.

class ReadReplicaRouter:
    """Route read queries to replicas, writes to primary"""
    
    def db_for_read(self, model, **hints):
        if hints.get('force_primary', False):
            return 'primary'
        
        # Route to random replica for load distribution
        return random.choice(['replica1', 'replica2', 'replica3'])
    
    def db_for_write(self, model, **hints):
        return 'primary'
    
    def allow_relation(self, obj1, obj2, **hints):
        # Allow relations between objects in different databases
        return True
    
    def allow_migrate(self, db, app_label, model_name=None, **hints):
        # Only allow migrations on primary database
        return db == 'primary'

Connection Rate Limiting

Implement sophisticated rate limiting to prevent abuse.

package main

import (
    "net/http"
    "sync"
    "time"
)

type RateLimiter struct {
    requests map[string][]time.Time
    mutex    sync.RWMutex
    limit    int
    window   time.Duration
}

func NewRateLimiter(limit int, window time.Duration) *RateLimiter {
    return &RateLimiter{
        requests: make(map[string][]time.Time),
        limit:    limit,
        window:   window,
    }
}

func (rl *RateLimiter
Back to Blog