Data Architecture Strategy
Last Updated: December 29, 2025
Purpose
This document defines the comprehensive data architecture for BitVelocity, including OLTP to OLAP data flows, audit database strategy, data governance, and analytics patterns that address the gaps identified in the original design.
Related Documentation:
- System Overview:
system-overview.md - Domain Architectures:
domains/*/DOMAIN_*_ARCHITECTURE.md - Event Contracts:
CROSS_EVENT_CONTRACTS_AND_VERSIONING.md - Data Platform & Analytics:
CROSS_DATA_PLATFORM_AND_ANALYTICS.md
Architecture Overview
Data Layer Strategy
┌─────────────────────────────────────────────────────────────────┐
│ OLTP Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐│
│ │ PostgreSQL │ │ Cassandra │ │ MongoDB │ │ Redis ││
│ │(Transaction)│ │ (Scale-out) │ │ (Document) │ │ (Cache) ││
│ └─────┬───────┘ └─────┬───────┘ └─────┬───────┘ └─────┬───────┘│
└─────────┼───────────────┼───────────────┼───────────────┼────────┘
│ │ │ │
│ ┌────▼─────────────────▼─────────────▼───┐
│ │ Change Data Capture │
│ │ (Debezium CDC) │
│ └────┬─────────────────────────────────┘
│ │
▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Event Streaming Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐│
│ │ Kafka │ │ NATS │ │ RabbitMQ │ │ Schema Reg ││
│ │ (Streaming) │ │(Lightweight)│ │ (Reliable) │ │ (Governance)││
│ └─────┬───────┘ └─────────────┘ └─────────────┘ └─────────────┘│
└─────────┼───────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ OLAP Layer (Bronze → Silver → Gold) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐│
│ │ Bronze │ │ Silver │ │ Gold │ │ Serving ││
│ │ (Raw Data) │ │ (Cleaned) │ │ (Business) │ │ Layer ││
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘│
└─────────────────────────────────────────────────────────────────┘
OLTP Database Strategy
Primary Transactional Database: PostgreSQL
Table Design with Audit Strategy
Every business entity includes comprehensive audit columns:
-- Standard audit columns for all tables
CREATE TABLE audit_base (
id BIGSERIAL PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
created_by VARCHAR(255) NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_by VARCHAR(255) NOT NULL,
version INTEGER NOT NULL DEFAULT 1,
deleted_at TIMESTAMP WITH TIME ZONE NULL,
deleted_by VARCHAR(255) NULL
);
-- Example: Orders table with audit
CREATE TABLE orders (
order_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_id UUID NOT NULL,
status VARCHAR(50) NOT NULL DEFAULT 'PENDING',
total_amount DECIMAL(10,2) NOT NULL,
currency VARCHAR(3) NOT NULL DEFAULT 'USD',
-- Audit columns
LIKE audit_base INCLUDING ALL,
-- Partitioning key
created_date DATE GENERATED ALWAYS AS (created_at::DATE) STORED
) PARTITION BY RANGE (created_date);
-- Audit trail table for complete change history
CREATE TABLE orders_audit (
audit_id BIGSERIAL PRIMARY KEY,
order_id UUID NOT NULL,
operation_type VARCHAR(10) NOT NULL, -- INSERT, UPDATE, DELETE
operation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
operation_user VARCHAR(255) NOT NULL,
old_values JSONB,
new_values JSONB,
correlation_id UUID -- For tracing changes across services
);
Partitioning Strategy
Time-Based Partitioning for high-volume tables:
-- Monthly partitions for orders
CREATE TABLE orders_y2024m01 PARTITION OF orders
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
-- Automated partition management
SELECT partman.create_parent(
p_parent_table => 'public.orders',
p_control => 'created_date',
p_type => 'range',
p_interval => 'monthly',
p_premake => 2
);
Scale-Out Databases
Cassandra for High-Volume Event Data
-- Chat messages with time-series pattern
CREATE TABLE chat_messages (
room_id UUID,
message_time TIMEUUID,
user_id UUID,
message_text TEXT,
metadata MAP<TEXT, TEXT>,
-- Audit fields
created_by TEXT,
trace_id TEXT,
PRIMARY KEY (room_id, message_time)
) WITH CLUSTERING ORDER BY (message_time DESC);
-- IoT telemetry data
CREATE TABLE device_telemetry (
device_id UUID,
timestamp TIMESTAMP,
sensor_type TEXT,
sensor_value DOUBLE,
-- Partitioning by device and time
PRIMARY KEY ((device_id, sensor_type), timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC);
MongoDB for Document-Heavy Domains
// Social media posts with flexible schema
{
_id: ObjectId,
user_id: UUID,
content: {
text: String,
media: [{
type: String, // image, video, link
url: String,
metadata: Object
}],
hashtags: [String],
mentions: [UUID]
},
engagement: {
likes: Number,
shares: Number,
comments: Number
},
// Audit fields
created_at: ISODate,
created_by: String,
updated_at: ISODate,
updated_by: String,
version: Number,
trace_id: String
}
OLTP to OLAP Data Flow
Change Data Capture (CDC) Strategy
Debezium Configuration
# Debezium connector for PostgreSQL
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: postgres-orders-connector
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 3
config:
database.hostname: postgres-primary
database.port: 5432
database.user: debezium
database.password: ${DEBEZIUM_PASSWORD}
database.dbname: bitvelocity
database.server.name: postgres-orders
table.include.list: public.orders,public.order_items,public.payments
plugin.name: pgoutput
transforms: unwrap
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
Event Schema with Audit Context
{
"schema": {
"type": "struct",
"fields": [
{"field": "order_id", "type": "string"},
{"field": "customer_id", "type": "string"},
{"field": "status", "type": "string"},
{"field": "total_amount", "type": "double"},
{"field": "created_at", "type": "string"},
{"field": "created_by", "type": "string"},
{"field": "updated_at", "type": "string"},
{"field": "updated_by", "type": "string"},
{"field": "version", "type": "int32"}
]
},
"payload": {
"order_id": "550e8400-e29b-41d4-a716-446655440000",
"customer_id": "123e4567-e89b-12d3-a456-426614174000",
"status": "CONFIRMED",
"total_amount": 99.99,
"created_at": "2024-01-15T10:30:00Z",
"created_by": "customer-service",
"updated_at": "2024-01-15T10:35:00Z",
"updated_by": "payment-service",
"version": 2
},
"metadata": {
"operation": "UPDATE",
"source": "orders",
"timestamp": "2024-01-15T10:35:00Z",
"transaction_id": "txn_123456",
"lsn": "24/3F000140"
}
}
Medallion Architecture: Bronze → Silver → Gold
Bronze Layer (Raw Data Ingestion)
-- Raw events from Kafka stored in Delta Lake format
CREATE TABLE bronze_orders (
event_id UUID DEFAULT gen_random_uuid(),
event_timestamp TIMESTAMP WITH TIME ZONE,
source_system VARCHAR(100),
operation_type VARCHAR(20),
table_name VARCHAR(100),
raw_payload JSONB,
-- Partitioning for efficient querying
ingestion_date DATE GENERATED ALWAYS AS (event_timestamp::DATE) STORED
) PARTITION BY RANGE (ingestion_date);
-- Example bronze record
INSERT INTO bronze_orders (event_timestamp, source_system, operation_type, table_name, raw_payload)
VALUES (
NOW(),
'postgres-orders',
'UPDATE',
'orders',
'{"order_id": "550e8400-e29b-41d4-a716-446655440000", "status": "CONFIRMED", ...}'
);
Silver Layer (Cleaned and Standardized)
-- Cleaned, typed, and standardized data
CREATE TABLE silver_orders (
order_id UUID PRIMARY KEY,
customer_id UUID NOT NULL,
status order_status_enum NOT NULL,
total_amount DECIMAL(10,2) NOT NULL,
currency VARCHAR(3) NOT NULL,
order_date DATE NOT NULL,
-- Data quality indicators
data_quality_score DECIMAL(3,2),
quality_checks_passed TEXT[],
quality_issues TEXT[],
-- Lineage information
source_bronze_id UUID,
processed_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
processing_version VARCHAR(20)
) PARTITION BY RANGE (order_date);
-- ETL process to transform bronze to silver
CREATE OR REPLACE FUNCTION bronze_to_silver_orders()
RETURNS VOID AS $$
BEGIN
INSERT INTO silver_orders (
order_id, customer_id, status, total_amount, currency, order_date,
data_quality_score, source_bronze_id
)
SELECT
(raw_payload->>'order_id')::UUID,
(raw_payload->>'customer_id')::UUID,
(raw_payload->>'status')::order_status_enum,
(raw_payload->>'total_amount')::DECIMAL(10,2),
COALESCE(raw_payload->>'currency', 'USD'),
(raw_payload->>'created_at')::DATE,
calculate_quality_score(raw_payload),
event_id
FROM bronze_orders
WHERE operation_type = 'INSERT'
AND ingestion_date = CURRENT_DATE
ON CONFLICT (order_id) DO UPDATE SET
status = EXCLUDED.status,
total_amount = EXCLUDED.total_amount,
processed_at = NOW();
END;
$$ LANGUAGE plpgsql;
Gold Layer (Business Logic and Aggregations)
-- Business-ready dimensional model
CREATE TABLE gold_order_facts (
fact_id BIGSERIAL PRIMARY KEY,
order_date_key INTEGER, -- Links to date dimension
customer_key INTEGER, -- Links to customer dimension
product_key INTEGER, -- Links to product dimension
-- Measures
order_count INTEGER DEFAULT 1,
total_amount DECIMAL(12,2),
discount_amount DECIMAL(12,2),
tax_amount DECIMAL(12,2),
shipping_amount DECIMAL(12,2),
-- Slowly Changing Dimension tracking
valid_from TIMESTAMP WITH TIME ZONE,
valid_to TIMESTAMP WITH TIME ZONE,
is_current BOOLEAN DEFAULT TRUE,
-- Lineage
source_silver_order_id UUID,
etl_batch_id UUID,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Daily sales aggregations
CREATE MATERIALIZED VIEW gold_daily_sales AS
SELECT
order_date_key,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
COUNT(DISTINCT customer_key) as unique_customers
FROM gold_order_facts
WHERE is_current = TRUE
GROUP BY order_date_key;
Real-Time Analytics
Kafka Streams Processing
@Component
public class OrderAnalyticsStream {
@Autowired
public void processOrderEvents(StreamsBuilder builder) {
KStream<String, OrderEvent> orderStream = builder
.stream("orders-topic", Consumed.with(Serdes.String(), orderEventSerde));
// Real-time revenue calculation
KTable<Windowed<String>, Double> revenueByHour = orderStream
.filter((key, order) -> "CONFIRMED".equals(order.getStatus()))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofHours(1)))
.aggregate(
() -> 0.0,
(key, order, aggregate) -> aggregate + order.getTotalAmount(),
Materialized.with(Serdes.String(), Serdes.Double())
);
// Output to analytics topic
revenueByHour
.toStream()
.to("hourly-revenue-topic");
}
}
Feature Store Integration
@Entity
@Table(name = "customer_features")
public class CustomerFeatures {
@Id
private UUID customerId;
// Real-time features
private Integer orderCountLast30Days;
private BigDecimal totalSpentLast30Days;
private Double avgOrderValue;
private String preferredCategory;
// Batch features
private String lifetimeSegment;
private Integer lifetimeOrderCount;
private BigDecimal lifetimeValue;
// Feature freshness tracking
@Column(name = "features_updated_at")
private Instant featuresUpdatedAt;
@Column(name = "feature_version")
private String featureVersion;
}
Data Governance & Quality
Schema Registry and Evolution
{
"type": "record",
"name": "OrderEvent",
"namespace": "com.bitvelocity.events",
"version": "v2",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "status", "type": {"type": "enum", "symbols": ["PENDING", "CONFIRMED", "SHIPPED", "DELIVERED", "CANCELLED"]}},
{"name": "totalAmount", "type": "double"},
{"name": "currency", "type": "string", "default": "USD"},
{
"name": "auditInfo",
"type": {
"type": "record",
"name": "AuditInfo",
"fields": [
{"name": "createdAt", "type": "long"},
{"name": "createdBy", "type": "string"},
{"name": "traceId", "type": ["null", "string"], "default": null}
]
}
}
]
}
Data Quality Framework
-- Data quality rules table
CREATE TABLE data_quality_rules (
rule_id UUID PRIMARY KEY,
table_name VARCHAR(100),
column_name VARCHAR(100),
rule_type VARCHAR(50), -- NOT_NULL, RANGE, PATTERN, REFERENCE
rule_config JSONB,
severity VARCHAR(20), -- ERROR, WARNING, INFO
is_active BOOLEAN DEFAULT TRUE
);
-- Example quality rules
INSERT INTO data_quality_rules VALUES
('rule-001', 'orders', 'total_amount', 'RANGE', '{"min": 0, "max": 10000}', 'ERROR', true),
('rule-002', 'orders', 'email', 'PATTERN', '{"regex": "^[^@]+@[^@]+\\.[^@]+$"}', 'ERROR', true),
('rule-003', 'orders', 'customer_id', 'REFERENCE', '{"table": "customers", "column": "id"}', 'ERROR', true);
-- Quality check results
CREATE TABLE data_quality_results (
check_id UUID PRIMARY KEY,
rule_id UUID REFERENCES data_quality_rules(rule_id),
checked_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
record_count INTEGER,
failed_count INTEGER,
success_rate DECIMAL(5,2),
sample_failures JSONB
);
Data Lineage Tracking
-- Data lineage tracking
CREATE TABLE data_lineage (
lineage_id UUID PRIMARY KEY,
source_dataset VARCHAR(200),
target_dataset VARCHAR(200),
transformation_type VARCHAR(100),
transformation_config JSONB,
dependency_level INTEGER, -- 1=direct, 2=indirect, etc.
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- OpenLineage integration
INSERT INTO data_lineage VALUES
('lineage-001', 'postgres.public.orders', 'bronze_orders', 'CDC_CAPTURE', '{"connector": "debezium"}', 1, NOW()),
('lineage-002', 'bronze_orders', 'silver_orders', 'ETL_TRANSFORM', '{"job": "bronze_to_silver_orders"}', 2, NOW()),
('lineage-003', 'silver_orders', 'gold_order_facts', 'DIMENSIONAL_MODEL', '{"type": "fact_table"}', 3, NOW());
Caching Strategy
Multi-Layer Caching Architecture
@Component
public class CacheStrategy {
// L1: Application-level cache
@Cacheable(value = "products", key = "#productId")
public Product getProduct(UUID productId) {
return productRepository.findById(productId);
}
// L2: Distributed cache
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public CustomerProfile getCustomerProfile(UUID customerId) {
String cacheKey = "customer:" + customerId;
CustomerProfile cached = (CustomerProfile) redisTemplate.opsForValue().get(cacheKey);
if (cached == null) {
cached = buildCustomerProfile(customerId);
redisTemplate.opsForValue().set(cacheKey, cached, Duration.ofMinutes(30));
}
return cached;
}
// Cache warming strategy
@Scheduled(fixedRate = 300000) // Every 5 minutes
public void warmTopProducts() {
List<UUID> topProductIds = analyticsService.getTopSellingProducts(100);
topProductIds.forEach(this::getProduct);
}
}
Cache Invalidation Patterns
@EventListener
public class CacheInvalidationHandler {
@Autowired
private CacheManager cacheManager;
@KafkaListener(topics = "product-updates")
public void handleProductUpdate(ProductUpdateEvent event) {
Cache productCache = cacheManager.getCache("products");
productCache.evict(event.getProductId());
// Invalidate related caches
if (event.isCategoryChange()) {
Cache categoryCache = cacheManager.getCache("categories");
categoryCache.evict(event.getCategoryId());
}
}
}
Performance Optimization
Database Optimization
-- Index strategies for common query patterns
CREATE INDEX CONCURRENTLY idx_orders_customer_date
ON orders (customer_id, created_date DESC)
WHERE deleted_at IS NULL;
-- Partial index for active orders
CREATE INDEX CONCURRENTLY idx_orders_active_status
ON orders (status, created_date DESC)
WHERE status IN ('PENDING', 'CONFIRMED', 'PROCESSING');
-- BRIN index for time-series data
CREATE INDEX idx_telemetry_timestamp_brin
ON device_telemetry USING BRIN (timestamp);
-- Query optimization monitoring
CREATE OR REPLACE FUNCTION track_slow_queries()
RETURNS TRIGGER AS $$
BEGIN
IF (EXTRACT(EPOCH FROM (clock_timestamp() - statement_timestamp())) > 1.0) THEN
INSERT INTO slow_query_log (query, duration, executed_at)
VALUES (current_query(), EXTRACT(EPOCH FROM (clock_timestamp() - statement_timestamp())), NOW());
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
Disaster Recovery & Backup
Backup Strategy
# Automated backup configuration
apiVersion: v1
kind: ConfigMap
metadata:
name: backup-config
data:
postgres-backup.sh: |
#!/bin/bash
DATE=$(date +%Y%m%d_%H%M%S)
# Full database backup
pg_dump -h $POSTGRES_HOST -U $POSTGRES_USER -d bitvelocity \
--format=custom --compress=9 \
--file="/backups/full_backup_${DATE}.dump"
# Incremental WAL archiving
archive_command = 'cp %p /archive/%f'
# Upload to cloud storage
aws s3 cp "/backups/full_backup_${DATE}.dump" \
"s3://bitvelocity-backups/postgres/${DATE}/"
Cross-Region Replication
-- PostgreSQL streaming replication setup
-- Primary server configuration
wal_level = replica
max_wal_senders = 3
wal_keep_segments = 32
archive_mode = on
archive_command = 'cp %p /archive/%f'
-- Create replication user
CREATE USER replicator REPLICATION LOGIN PASSWORD 'secure_password';
-- Replica server setup
standby_mode = 'on'
primary_conninfo = 'host=primary_host port=5432 user=replicator'
restore_command = 'cp /archive/%f %p'
Monitoring & Alerting
Data Pipeline Monitoring
@Component
public class DataPipelineMonitor {
@Autowired
private MeterRegistry meterRegistry;
@EventListener
public void handleDataProcessingEvent(DataProcessingEvent event) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
// Process data
sample.stop(Timer.builder("data.processing.duration")
.tag("pipeline", event.getPipelineName())
.tag("status", "success")
.register(meterRegistry));
meterRegistry.counter("data.records.processed",
"pipeline", event.getPipelineName())
.increment(event.getRecordCount());
} catch (Exception e) {
sample.stop(Timer.builder("data.processing.duration")
.tag("pipeline", event.getPipelineName())
.tag("status", "error")
.register(meterRegistry));
meterRegistry.counter("data.processing.errors",
"pipeline", event.getPipelineName(),
"error_type", e.getClass().getSimpleName())
.increment();
}
}
}
Cost Optimization
Storage Cost Management
- Data Lifecycle Policies: Automated archival of old partitions
- Compression: Use appropriate compression for different data types
- Tiered Storage: Hot, warm, and cold storage strategies
- Query Optimization: Efficient queries to reduce compute costs
Resource Scaling
- Auto-scaling: Scale compute resources based on workload
- Spot Instances: Use spot instances for batch processing
- Reserved Capacity: Reserve capacity for predictable workloads
- Resource Monitoring: Track resource utilization and costs
Related Documentation
Architecture References
- System Overview - Platform architecture
- E-Commerce Domain - OLTP examples
- IoT Domain - High-volume ingestion
- Data Platform & Analytics - Analytics patterns
- Observability - Data monitoring
Implementation Guides
- Microservices Patterns - CDC, CQRS patterns
- Testing Strategy - Data quality testing
Project Management
- Execution Roadmap - Implementation phases
- Budget Planning - Storage costs
Key ADRs
This data architecture provides a solid foundation for learning comprehensive data engineering patterns while maintaining production-ready standards and addressing all the gaps identified in the original design.
Document Status: Active Reference ✅
Last Review: December 29, 2025