Real-time Data Pipeline

Written by Terrell Flautt on October 5, 2025

· 8 min · snapitanalytics.com

Architecture

Data Sources → Kinesis Streams → Lambda Processors
                    ↓
        DynamoDB + S3 Data Lake → QuickSight
                    ↓
            React Dashboard + WebSocket Updates

Core Challenge

Problem: Process millions of events with real-time visualization.
Solution: Serverless streaming pipeline with auto-scaling components.

Kinesis Stream Processor

exports.processStream = async (event) => {
    const batch = [];

    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString()
        );

        // Enrich event data
        const enriched = {
            ...payload,
            timestamp: record.kinesis.approximateArrivalTimestamp,
            shardId: record.eventSourceARN.split('/')[1],
            processedAt: Date.now()
        };

        // Aggregate metrics
        await updateMetrics(enriched);

        // Prepare for batch write
        batch.push({
            PutRequest: {
                Item: {
                    id: uuidv4(),
                    ...enriched,
                    ttl: Math.floor(Date.now() / 1000) + (30 * 24 * 60 * 60) // 30 days
                }
            }
        });
    }

    // Batch write to DynamoDB
    if (batch.length > 0) {
        await dynamoDB.batchWrite({
            RequestItems: {
                'analytics_events': batch
            }
        }).promise();
    }

    // Send to S3 for long-term storage
    await s3.putObject({
        Bucket: 'snapitanalytics-datalake',
        Key: `events/${new Date().toISOString().split('T')[0]}/${uuidv4()}.json`,
        Body: JSON.stringify(event.Records.map(r =>
            JSON.parse(Buffer.from(r.kinesis.data, 'base64').toString())
        ))
    }).promise();
};

Real-time Aggregation

async function updateMetrics(event) {
    const { eventType, userId, timestamp } = event;
    const hour = Math.floor(timestamp / (1000 * 60 * 60));

    // Update hourly metrics
    await dynamoDB.update({
        TableName: 'metrics_hourly',
        Key: {
            eventType_hour: `${eventType}_${hour}`,
            metricType: 'count'
        },
        UpdateExpression: 'ADD #count :inc',
        ExpressionAttributeNames: { '#count': 'count' },
        ExpressionAttributeValues: { ':inc': 1 }
    }).promise();

    // Update user metrics
    if (userId) {
        await dynamoDB.update({
            TableName: 'user_metrics',
            Key: { userId, hour },
            UpdateExpression: 'ADD events :inc, SET lastSeen = :now',
            ExpressionAttributeValues: {
                ':inc': 1,
                ':now': timestamp
            }
        }).promise();
    }
}

Dashboard API

exports.getDashboardData = async (event) => {
    const { timeRange, filters } = JSON.parse(event.body);

    // Get real-time metrics
    const metrics = await Promise.all([
        getEventCounts(timeRange, filters),
        getUserMetrics(timeRange, filters),
        getTopEvents(timeRange, filters),
        getGeographicData(timeRange, filters)
    ]);

    // Calculate trends
    const trends = calculateTrends(metrics[0]);

    return {
        statusCode: 200,
        body: JSON.stringify({
            eventCounts: metrics[0],
            userMetrics: metrics[1],
            topEvents: metrics[2],
            geographic: metrics[3],
            trends
        })
    };
};

async function getEventCounts(timeRange, filters) {
    const params = {
        TableName: 'metrics_hourly',
        KeyConditionExpression: 'eventType_hour BETWEEN :start AND :end',
        ExpressionAttributeValues: {
            ':start': `${filters.eventType}_${timeRange.start}`,
            ':end': `${filters.eventType}_${timeRange.end}`
        }
    };

    const result = await dynamoDB.query(params).promise();
    return result.Items.reduce((acc, item) => {
        acc[item.eventType_hour] = item.count;
        return acc;
    }, {});
}

React Dashboard

// Real-time dashboard with Chart.js
function AnalyticsDashboard() {
    const [metrics, setMetrics] = useState({});
    const [filters, setFilters] = useState({
        timeRange: '24h',
        eventType: 'all'
    });

    // WebSocket for real-time updates
    useEffect(() => {
        const ws = new WebSocket('wss://api.snapitanalytics.com/realtime');

        ws.onmessage = (event) => {
            const update = JSON.parse(event.data);
            setMetrics(prev => ({
                ...prev,
                [update.metric]: update.value
            }));
        };

        return () => ws.close();
    }, []);

    // Fetch dashboard data
    const fetchData = async () => {
        const response = await fetch('/api/dashboard', {
            method: 'POST',
            headers: { 'Content-Type': 'application/json' },
            body: JSON.stringify({ filters })
        });
        const data = await response.json();
        setMetrics(data);
    };

    return (
        <div className="dashboard-grid">
            <MetricCard
                title="Total Events"
                value={metrics.totalEvents}
                trend={metrics.eventsTrend}
            />
            <LineChart
                data={metrics.eventCounts}
                title="Events Over Time"
            />
            <HeatMap
                data={metrics.geographic}
                title="Geographic Distribution"
            />
            <TopEventsList
                events={metrics.topEvents}
            />
        </div>
    );
}

Data Export Lambda

exports.exportData = async (event) => {
    const { format, timeRange, filters, userId } = JSON.parse(event.body);

    // Query data from DynamoDB
    const data = await queryAnalyticsData(timeRange, filters);

    let exportData;
    switch (format) {
        case 'csv':
            exportData = convertToCSV(data);
            break;
        case 'json':
            exportData = JSON.stringify(data, null, 2);
            break;
        case 'parquet':
            exportData = await convertToParquet(data);
            break;
    }

    // Upload to S3
    const key = `exports/${userId}/${Date.now()}.${format}`;
    await s3.putObject({
        Bucket: 'snapitanalytics-exports',
        Key: key,
        Body: exportData,
        ServerSideEncryption: 'AES256'
    }).promise();

    // Generate presigned URL
    const downloadUrl = s3.getSignedUrl('getObject', {
        Bucket: 'snapitanalytics-exports',
        Key: key,
        Expires: 3600 // 1 hour
    });

    return {
        statusCode: 200,
        body: JSON.stringify({ downloadUrl })
    };
};

DynamoDB Schema

analytics_events:
- id (string) - partition key
- timestamp (number) - sort key
- eventType (string) - GSI partition key
- userId (string) - GSI partition key
- data (map) - event payload
- ttl (number) - auto-expire

metrics_hourly:
- eventType_hour (string) - partition key
- metricType (string) - sort key
- count (number) - metric value
- lastUpdated (number)

user_metrics:
- userId (string) - partition key
- hour (number) - sort key
- events (number) - event count
- lastSeen (number) - timestamp

Key Features

Custom Alerts

exports.checkAlerts = async (event) => {
    const alerts = await dynamoDB.scan({
        TableName: 'alerts',
        FilterExpression: '#status = :active',
        ExpressionAttributeNames: { '#status': 'status' },
        ExpressionAttributeValues: { ':active': 'active' }
    }).promise();

    for (const alert of alerts.Items) {
        const currentValue = await getCurrentMetric(alert.metric);

        if (evaluateCondition(currentValue, alert.condition)) {
            await triggerAlert(alert);
        }
    }
};

Funnel Analysis

function FunnelChart({ funnelData }) {
    const steps = funnelData.steps;
    const maxValue = Math.max(...steps.map(s => s.value));

    return (
        <div className="funnel-chart">
            {steps.map((step, index) => (
                <div
                    key={step.name}
                    className="funnel-step"
                    style={{
                        width: `${(step.value / maxValue) * 100}%`,
                        background: `hsl(${200 + index * 20}, 70%, 60%)`
                    }}
                >
                    <span>{step.name}: {step.value.toLocaleString()}</span>
                    {index > 0 && (
                        <small>{((step.value / steps[index-1].value) * 100).toFixed(1)}%</small>
                    )}
                </div>
            ))}
        </div>
    );
}

Performance Optimization

// Connection pooling for Lambda
let dynamoClient;
const getDynamoClient = () => {
    if (!dynamoClient) {
        dynamoClient = new AWS.DynamoDB.DocumentClient({
            region: process.env.AWS_REGION,
            maxRetries: 3,
            httpOptions: {
                connectTimeout: 1000,
                timeout: 5000
            }
        });
    }
    return dynamoClient;
};

SSM Parameters

/snapitanalytics/kinesis-stream - Stream name
/snapitanalytics/s3-datalake - Data lake bucket
/snapitanalytics/quicksight-role - QuickSight IAM role
/snapitanalytics/websocket-endpoint - Real-time endpoint

Performance