Real-time Data Pipeline
Written by Terrell Flautt on October 5, 2025
· 8 min · snapitanalytics.com
Architecture
Data Sources → Kinesis Streams → Lambda Processors
↓
DynamoDB + S3 Data Lake → QuickSight
↓
React Dashboard + WebSocket Updates
Core Challenge
Problem: Process millions of events with real-time visualization.
Solution: Serverless streaming pipeline with auto-scaling components.
Kinesis Stream Processor
exports.processStream = async (event) => {
const batch = [];
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString()
);
// Enrich event data
const enriched = {
...payload,
timestamp: record.kinesis.approximateArrivalTimestamp,
shardId: record.eventSourceARN.split('/')[1],
processedAt: Date.now()
};
// Aggregate metrics
await updateMetrics(enriched);
// Prepare for batch write
batch.push({
PutRequest: {
Item: {
id: uuidv4(),
...enriched,
ttl: Math.floor(Date.now() / 1000) + (30 * 24 * 60 * 60) // 30 days
}
}
});
}
// Batch write to DynamoDB
if (batch.length > 0) {
await dynamoDB.batchWrite({
RequestItems: {
'analytics_events': batch
}
}).promise();
}
// Send to S3 for long-term storage
await s3.putObject({
Bucket: 'snapitanalytics-datalake',
Key: `events/${new Date().toISOString().split('T')[0]}/${uuidv4()}.json`,
Body: JSON.stringify(event.Records.map(r =>
JSON.parse(Buffer.from(r.kinesis.data, 'base64').toString())
))
}).promise();
};
Real-time Aggregation
async function updateMetrics(event) {
const { eventType, userId, timestamp } = event;
const hour = Math.floor(timestamp / (1000 * 60 * 60));
// Update hourly metrics
await dynamoDB.update({
TableName: 'metrics_hourly',
Key: {
eventType_hour: `${eventType}_${hour}`,
metricType: 'count'
},
UpdateExpression: 'ADD #count :inc',
ExpressionAttributeNames: { '#count': 'count' },
ExpressionAttributeValues: { ':inc': 1 }
}).promise();
// Update user metrics
if (userId) {
await dynamoDB.update({
TableName: 'user_metrics',
Key: { userId, hour },
UpdateExpression: 'ADD events :inc, SET lastSeen = :now',
ExpressionAttributeValues: {
':inc': 1,
':now': timestamp
}
}).promise();
}
}
Dashboard API
exports.getDashboardData = async (event) => {
const { timeRange, filters } = JSON.parse(event.body);
// Get real-time metrics
const metrics = await Promise.all([
getEventCounts(timeRange, filters),
getUserMetrics(timeRange, filters),
getTopEvents(timeRange, filters),
getGeographicData(timeRange, filters)
]);
// Calculate trends
const trends = calculateTrends(metrics[0]);
return {
statusCode: 200,
body: JSON.stringify({
eventCounts: metrics[0],
userMetrics: metrics[1],
topEvents: metrics[2],
geographic: metrics[3],
trends
})
};
};
async function getEventCounts(timeRange, filters) {
const params = {
TableName: 'metrics_hourly',
KeyConditionExpression: 'eventType_hour BETWEEN :start AND :end',
ExpressionAttributeValues: {
':start': `${filters.eventType}_${timeRange.start}`,
':end': `${filters.eventType}_${timeRange.end}`
}
};
const result = await dynamoDB.query(params).promise();
return result.Items.reduce((acc, item) => {
acc[item.eventType_hour] = item.count;
return acc;
}, {});
}
React Dashboard
// Real-time dashboard with Chart.js
function AnalyticsDashboard() {
const [metrics, setMetrics] = useState({});
const [filters, setFilters] = useState({
timeRange: '24h',
eventType: 'all'
});
// WebSocket for real-time updates
useEffect(() => {
const ws = new WebSocket('wss://api.snapitanalytics.com/realtime');
ws.onmessage = (event) => {
const update = JSON.parse(event.data);
setMetrics(prev => ({
...prev,
[update.metric]: update.value
}));
};
return () => ws.close();
}, []);
// Fetch dashboard data
const fetchData = async () => {
const response = await fetch('/api/dashboard', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ filters })
});
const data = await response.json();
setMetrics(data);
};
return (
<div className="dashboard-grid">
<MetricCard
title="Total Events"
value={metrics.totalEvents}
trend={metrics.eventsTrend}
/>
<LineChart
data={metrics.eventCounts}
title="Events Over Time"
/>
<HeatMap
data={metrics.geographic}
title="Geographic Distribution"
/>
<TopEventsList
events={metrics.topEvents}
/>
</div>
);
}
Data Export Lambda
exports.exportData = async (event) => {
const { format, timeRange, filters, userId } = JSON.parse(event.body);
// Query data from DynamoDB
const data = await queryAnalyticsData(timeRange, filters);
let exportData;
switch (format) {
case 'csv':
exportData = convertToCSV(data);
break;
case 'json':
exportData = JSON.stringify(data, null, 2);
break;
case 'parquet':
exportData = await convertToParquet(data);
break;
}
// Upload to S3
const key = `exports/${userId}/${Date.now()}.${format}`;
await s3.putObject({
Bucket: 'snapitanalytics-exports',
Key: key,
Body: exportData,
ServerSideEncryption: 'AES256'
}).promise();
// Generate presigned URL
const downloadUrl = s3.getSignedUrl('getObject', {
Bucket: 'snapitanalytics-exports',
Key: key,
Expires: 3600 // 1 hour
});
return {
statusCode: 200,
body: JSON.stringify({ downloadUrl })
};
};
DynamoDB Schema
analytics_events:
- id (string) - partition key
- timestamp (number) - sort key
- eventType (string) - GSI partition key
- userId (string) - GSI partition key
- data (map) - event payload
- ttl (number) - auto-expire
metrics_hourly:
- eventType_hour (string) - partition key
- metricType (string) - sort key
- count (number) - metric value
- lastUpdated (number)
user_metrics:
- userId (string) - partition key
- hour (number) - sort key
- events (number) - event count
- lastSeen (number) - timestamp
Key Features
Custom Alerts
exports.checkAlerts = async (event) => {
const alerts = await dynamoDB.scan({
TableName: 'alerts',
FilterExpression: '#status = :active',
ExpressionAttributeNames: { '#status': 'status' },
ExpressionAttributeValues: { ':active': 'active' }
}).promise();
for (const alert of alerts.Items) {
const currentValue = await getCurrentMetric(alert.metric);
if (evaluateCondition(currentValue, alert.condition)) {
await triggerAlert(alert);
}
}
};
Funnel Analysis
function FunnelChart({ funnelData }) {
const steps = funnelData.steps;
const maxValue = Math.max(...steps.map(s => s.value));
return (
<div className="funnel-chart">
{steps.map((step, index) => (
<div
key={step.name}
className="funnel-step"
style={{
width: `${(step.value / maxValue) * 100}%`,
background: `hsl(${200 + index * 20}, 70%, 60%)`
}}
>
<span>{step.name}: {step.value.toLocaleString()}</span>
{index > 0 && (
<small>{((step.value / steps[index-1].value) * 100).toFixed(1)}%</small>
)}
</div>
))}
</div>
);
}
Performance Optimization
// Connection pooling for Lambda
let dynamoClient;
const getDynamoClient = () => {
if (!dynamoClient) {
dynamoClient = new AWS.DynamoDB.DocumentClient({
region: process.env.AWS_REGION,
maxRetries: 3,
httpOptions: {
connectTimeout: 1000,
timeout: 5000
}
});
}
return dynamoClient;
};
SSM Parameters
/snapitanalytics/kinesis-stream - Stream name
/snapitanalytics/s3-datalake - Data lake bucket
/snapitanalytics/quicksight-role - QuickSight IAM role
/snapitanalytics/websocket-endpoint - Real-time endpoint
Performance
- Throughput: 100K+ events/second
- Latency: <100ms real-time updates
- Cost: $0.001 per 1K events
- Retention: 30 days hot, unlimited cold