Event Schema Design
Best practices for designing and managing event schemas in FlowMart's event-driven architecture
This guide outlines best practices for designing, evolving, and managing event schemas in FlowMart’s event-driven architecture.
Introduction to Events in Our Architecture
Events are the backbone of FlowMart’s microservices ecosystem. They enable:
- Loose coupling between services
- Asynchronous communication
- Eventual consistency across service boundaries
- Event sourcing for critical business processes
- Audit trails of system changes
Event Schema Fundamentals
What Is an Event Schema?
An event schema defines the structure and validation rules for events flowing through our system. Properly designed schemas ensure events can be:
- Produced consistently by services
- Consumed reliably by other services
- Evolved over time without breaking consumers
- Validated to prevent invalid data from propagating
- Documented for developers to understand and use
Event Schema Registry
FlowMart uses a centralized Schema Registry to:
- Store all event schemas
- Validate events at publish time
- Provide a browsable catalog of events
- Track schema versions and compatibility
- Generate client libraries and documentation
All services must register their event schemas in the central registry before publishing events.
Schema Design Principles
1. Design for Evolution
Events should be designed to evolve over time:
- Additive Changes Only: Add optional fields rather than modifying existing ones
- Required Minimal Core: Keep required fields to essential business data
- Meaningful Defaults: Provide sensible defaults for optional fields
- Version Awareness: Include schema version information
2. Event Ownership
Each event type has a single owner:
- The producing service owns the event schema
- Only the owner can make changes to the schema
- The owner is responsible for schema compatibility
3. Semantic Versioning
Follow semantic versioning for event schemas:
- Major Version: Breaking changes (consumers must update)
- Minor Version: Backward-compatible feature additions
- Patch Version: Backward-compatible bug fixes
4. Business-Oriented Event Naming
Events should be named using business terminology:
- Use past tense verbs (e.g.,
OrderPlaced
, notCreateOrder
) - Follow the pattern:
[Entity][Event]
(e.g.,ProductCreated
,PaymentProcessed
) - Use domain-specific terminology consistent with our ubiquitous language
Event Schema Format (JSON Schema)
FlowMart uses JSON Schema as the standard format for defining event schemas:
{ "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://schemas.flowmart.com/events/product/ProductCreated/1.0.0", "title": "ProductCreated", "description": "Represents the creation of a new product in the catalog", "type": "object", "required": ["eventId", "eventType", "eventVersion", "timestamp", "data"], "properties": { "eventId": { "type": "string", "format": "uuid", "description": "Unique identifier for this event instance" }, "eventType": { "type": "string", "enum": ["ProductCreated"], "description": "Type of the event" }, "eventVersion": { "type": "string", "pattern": "^\\d+\\.\\d+\\.\\d+$", "description": "Semantic version of the event schema" }, "timestamp": { "type": "string", "format": "date-time", "description": "ISO-8601 timestamp when the event was created" }, "source": { "type": "string", "description": "Service that produced the event" }, "data": { "type": "object", "required": ["productId", "name", "sku", "price"], "properties": { "productId": { "type": "string", "format": "uuid", "description": "Unique identifier for the product" }, "name": { "type": "string", "minLength": 1, "maxLength": 255, "description": "Name of the product" }, "description": { "type": "string", "maxLength": 2000, "description": "Description of the product" }, "sku": { "type": "string", "pattern": "^[A-Z0-9-]{5,20}$", "description": "Stock keeping unit - unique product identifier" }, "price": { "type": "object", "required": ["amount", "currency"], "properties": { "amount": { "type": "number", "exclusiveMinimum": 0, "description": "Price amount" }, "currency": { "type": "string", "enum": ["USD", "EUR", "GBP", "CAD"], "description": "Price currency code" } } }, "categories": { "type": "array", "items": { "type": "string" }, "description": "Categories the product belongs to" }, "attributes": { "type": "object", "additionalProperties": { "type": ["string", "number", "boolean"] }, "description": "Additional product attributes as key-value pairs" } } }, "metadata": { "type": "object", "additionalProperties": true, "description": "Additional contextual information about the event" }, "correlationId": { "type": "string", "format": "uuid", "description": "ID for correlating related events" }, "causationId": { "type": "string", "format": "uuid", "description": "ID of the event that caused this event" } }, "additionalProperties": false}
Standard Event Envelope
All FlowMart events follow a standard envelope structure:
{ "eventId": "f47ac10b-58cc-4372-a567-0e02b2c3d479", "eventType": "ProductCreated", "eventVersion": "1.0.0", "timestamp": "2023-09-15T13:25:47.803Z", "source": "product-service", "data": { // Event-specific payload }, "metadata": { // Optional contextual information }, "correlationId": "7f8d0e3c-d5f9-42e1-a11b-78ad6c0c380a", "causationId": "3e4f5d6c-7b8a-9c0d-1e2f-3a4b5c6d7e8f"}
Required Envelope Fields
Field | Type | Description |
---|---|---|
eventId | UUID | Unique identifier for the event instance |
eventType | String | Name of the event (e.g., ProductCreated ) |
eventVersion | String | Semantic version of the event schema |
timestamp | ISO-8601 | When the event occurred |
data | Object | Event-specific payload |
Optional Envelope Fields
Field | Type | Description |
---|---|---|
source | String | Service that produced the event |
metadata | Object | Additional context about the event |
correlationId | UUID | ID for correlating related events in a flow |
causationId | UUID | ID of the event that caused this event |
Event Data Types
Primitive Types
- String: Use for text data
- Number: Use for numeric values (integers or decimals)
- Boolean: Use for true/false flags
- Array: Use for collections of the same type
- Object: Use for nested structures
Specialized Formats
- UUID: Use for unique identifiers (format:
uuid
) - ISO Date-Time: Use for timestamps (format:
date-time
) - Email: Use for email addresses (format:
email
) - URI: Use for web addresses (format:
uri
) - Decimal: Use for currency amounts (type:
number
)
Complex Types
For complex or reusable types, create separate schema definitions:
{ "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://schemas.flowmart.com/common/Address/1.0.0", "title": "Address", "type": "object", "required": ["line1", "city", "postalCode", "country"], "properties": { "line1": { "type": "string", "maxLength": 100 }, "line2": { "type": "string", "maxLength": 100 }, "city": { "type": "string", "maxLength": 100 }, "region": { "type": "string", "maxLength": 100 }, "postalCode": { "type": "string", "maxLength": 20 }, "country": { "type": "string", "maxLength": 2, "pattern": "^[A-Z]{2}$" } }}
Then reference them in your event schemas:
{ "properties": { "shippingAddress": { "$ref": "https://schemas.flowmart.com/common/Address/1.0.0" } }}
Common Event Patterns
State Change Events
Represent changes to an entity’s state:
{ "eventType": "OrderStatusChanged", "data": { "orderId": "61fea0a1-2ac4-4e8c-a851-d38f7c8c06f9", "previousStatus": "PAYMENT_PENDING", "newStatus": "PAYMENT_COMPLETED", "reason": "Payment successful", "changedBy": "payment-service" }}
Resource Creation Events
Represent the creation of a new entity:
{ "eventType": "CustomerCreated", "data": { "customerId": "cust-12345", "email": "john.doe@example.com", "firstName": "John", "lastName": "Doe", "createdAt": "2023-09-15T10:30:00Z" }}
Resource Update Events
Represent updates to an existing entity:
{ "eventType": "ProductUpdated", "data": { "productId": "b3c631a5-f7c8-4d89-a57f-dd2f069b5730", "changes": { "price": { "amount": 24.99, "currency": "USD" }, "inventory": { "inStock": 250 } }, "updatedAt": "2023-09-15T14:22:36Z" }}
Action Events
Represent business actions that occurred:
{ "eventType": "PaymentProcessed", "data": { "paymentId": "pay-67890", "orderId": "ord-12345", "amount": 99.99, "currency": "USD", "status": "SUCCESSFUL", "paymentMethod": "CREDIT_CARD", "processedAt": "2023-09-15T13:45:22Z" }}
Schema Evolution
Compatibility Types
We support the following compatibility modes for schema evolution:
- Backward: New schema can read data produced with previous schema
- Forward: Previous schema can read data produced with new schema
- Full: Both backward and forward compatibility
- None: No compatibility guarantees (use with caution)
Backward Compatibility Rules
- Adding optional fields is safe
- Removing optional fields is safe
- Making a required field optional is safe
- Adding new enum values is safe
- Widening numeric ranges is safe (e.g., int to float)
Breaking Changes to Avoid
- ❌ Removing required fields
- ❌ Adding required fields
- ❌ Changing field types
- ❌ Renaming fields
- ❌ Restricting enum values
Handling Breaking Changes
If you must make a breaking change:
- Create a new major version of the schema
- Maintain both versions for a transition period
- Implement dual publishing for critical events
- Help consumers migrate to the new version
- Deprecate the old version with advance notice
Consuming Events
Consumer Best Practices
-
Be tolerant in what you accept:
- Ignore unknown fields
- Provide defaults for missing optional fields
- Handle enum values gracefully, including unknown values
-
Validate incoming events:
- Verify events against their schema
- Check required fields
- Validate business rules before processing
-
Handle versioning gracefully:
- Check event version before processing
- Implement version-specific handlers if needed
- Subscribe to schema registry updates
Consumer Code Example (TypeScript)
import { KafkaConsumer } from '@flowmart/kafka-client';import { SchemaRegistry } from '@flowmart/schema-registry';import { OrderProcessingService } from './services';
// Initialize schema registry clientconst schemaRegistry = new SchemaRegistry({ baseUrl: 'https://schema-registry.flowmart.com',});
// Define event interfaceinterface OrderPlacedEvent { eventId: string; eventType: 'OrderPlaced'; eventVersion: string; timestamp: string; data: { orderId: string; customerId: string; items: Array<{ productId: string; quantity: number; unitPrice: number; }>; totalAmount: number; currency: string; shippingAddress: { // Address fields... }; }; // Other envelope fields...}
// Initialize consumerconst orderConsumer = new KafkaConsumer({ groupId: 'inventory-service', brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],});
// Initialize serviceconst orderProcessor = new OrderProcessingService();
// Start consuming eventsasync function startConsumer() { await orderConsumer.subscribe('order-events');
orderConsumer.on('message', async (message) => { try { // Parse the message const rawEvent = JSON.parse(message.value.toString());
// Skip if not the event we're interested in if (rawEvent.eventType !== 'OrderPlaced') { return; }
// Validate against schema const isValid = await schemaRegistry.validate( rawEvent, 'OrderPlaced', rawEvent.eventVersion );
if (!isValid) { console.error('Invalid event schema', rawEvent); return; }
// Type-safe processing const event = rawEvent as OrderPlacedEvent;
// Process the order await orderProcessor.processNewOrder(event.data);
// Commit the offset await orderConsumer.commitOffset(message);
} catch (error) { console.error('Error processing order event', error); // Implement retry/dead-letter logic } });}
startConsumer().catch(console.error);
Publishing Events
Producer Best Practices
-
Validate before publishing:
- Ensure events comply with their schema
- Verify business rules and data integrity
- Set appropriate event headers
-
Include essential metadata:
- Generate a unique event ID
- Set the correct event type and version
- Include accurate timestamp
- Set correlation and causation IDs
-
Handle publishing failures:
- Implement retry mechanisms with backoff
- Store events temporarily if Kafka is unavailable
- Log failed events for troubleshooting
Producer Code Example (TypeScript)
import { v4 as uuid } from 'uuid';import { KafkaProducer } from '@flowmart/kafka-client';import { SchemaRegistry } from '@flowmart/schema-registry';import { Product } from './models';
// Initialize schema registry clientconst schemaRegistry = new SchemaRegistry({ baseUrl: 'https://schema-registry.flowmart.com',});
// Initialize producerconst producer = new KafkaProducer({ clientId: 'product-service', brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],});
export class ProductEventService { async publishProductCreated(product: Product, correlationId?: string): Promise<void> { const eventId = uuid();
const event = { eventId, eventType: 'ProductCreated', eventVersion: '1.0.0', timestamp: new Date().toISOString(), source: 'product-service', data: { productId: product.id, name: product.name, summary: product.description || null, sku: product.sku, price: { amount: product.price, currency: 'USD' // Default to USD }, categories: product.categories || [], attributes: product.attributes || {} }, metadata: { // Add any additional metadata }, correlationId: correlationId || eventId, causationId: null // No previous event caused this };
// Validate against schema const isValid = await schemaRegistry.validate( event, 'ProductCreated', '1.0.0' );
if (!isValid) { const errors = await schemaRegistry.getValidationErrors( event, 'ProductCreated', '1.0.0' ); throw new Error(`Invalid event schema: ${JSON.stringify(errors)}`); }
// Publish event await producer.send({ topic: 'product-events', messages: [ { key: product.id, value: JSON.stringify(event), headers: { 'eventType': 'ProductCreated', 'contentType': 'application/json' } } ] });
console.log(`Published ProductCreated event: ${eventId}`); }}
Schema Registry Integration
Registering a New Schema
# Using CLI toolflowmart-schema register \ --file ./schemas/ProductCreated.json \ --compatibility BACKWARD
# API endpointcurl -X POST https://schema-registry.flowmart.com/subjects/ProductCreated/versions \ -H "Content-Type: application/json" \ -d @./schemas/ProductCreated.json
Retrieving a Schema
// Using JavaScript clientconst schema = await schemaRegistry.getSchema('ProductCreated', '1.0.0');
// API endpointcurl https://schema-registry.flowmart.com/subjects/ProductCreated/versions/latest
Checking Compatibility
// Using JavaScript clientconst isCompatible = await schemaRegistry.checkCompatibility( newSchema, 'ProductCreated');
// API endpointcurl -X POST https://schema-registry.flowmart.com/compatibility/subjects/ProductCreated/versions/latest \ -H "Content-Type: application/json" \ -d @./schemas/ProductCreated.v2.json
Schema Registry UI
Our Schema Registry includes a web interface at https://schema-registry.flowmart.com/ui that provides:
- Browsable catalog of all event schemas
- Schema versioning history
- Compatibility information
- Schema validation tools
- Documentation generation
Testing Event Schemas
Unit Testing Schemas
import { validateAgainstSchema } from '@flowmart/schema-validator';import productCreatedSchema from './schemas/ProductCreated.json';
describe('ProductCreated schema', () => { it('validates valid events', () => { const validEvent = { eventId: 'f47ac10b-58cc-4372-a567-0e02b2c3d479', eventType: 'ProductCreated', eventVersion: '1.0.0', timestamp: '2023-09-15T13:25:47.803Z', data: { productId: 'b3c631a5-f7c8-4d89-a57f-dd2f069b5730', name: 'Smartphone X Pro', sku: 'SP-XPRO-2023', price: { amount: 999.99, currency: 'USD' } } };
const result = validateAgainstSchema(validEvent, productCreatedSchema); expect(result.valid).toBe(true); });
it('rejects events with missing required fields', () => { const invalidEvent = { eventId: 'f47ac10b-58cc-4372-a567-0e02b2c3d479', eventType: 'ProductCreated', eventVersion: '1.0.0', timestamp: '2023-09-15T13:25:47.803Z', data: { // Missing required productId name: 'Smartphone X Pro', // Missing required sku price: { amount: 999.99, currency: 'USD' } } };
const result = validateAgainstSchema(invalidEvent, productCreatedSchema); expect(result.valid).toBe(false); expect(result.errors.length).toBeGreaterThan(0); });});
Integration Testing with Schema Registry
describe('Schema Registry Integration', () => { it('registers and validates schema', async () => { // Register test schema await schemaRegistry.registerSchema( 'TestEvent', testEventSchema, 'BACKWARD' );
// Create test event const testEvent = { eventId: uuid(), eventType: 'TestEvent', eventVersion: '1.0.0', timestamp: new Date().toISOString(), data: { // Test data... } };
// Validate against registered schema const isValid = await schemaRegistry.validate( testEvent, 'TestEvent', '1.0.0' );
expect(isValid).toBe(true); });});
Event Documentation
Self-Documenting Schemas
Use descriptive fields in your JSON Schema to auto-generate documentation:
{ "title": "ProductCreated", "description": "Published when a new product is created in the catalog", "properties": { "data": { "properties": { "productId": { "description": "Unique identifier for the product", "examples": ["p-12345"] } } } }}
Documentation in Code
Document event handling with clear comments:
/** * Handles the ProductCreated event * This event is triggered when a new product is added to the catalog. * It updates the inventory service with the new product information. * * @param event The ProductCreated event * @see https://schema-registry.flowmart.com/ui/schemas/ProductCreated/1.0.0 */async function handleProductCreated(event: ProductCreatedEvent): Promise<void> { // Implementation...}
Event Tracing and Debugging
Correlation IDs
Use correlation IDs to trace requests across services:
// When handling an API requestconst correlationId = req.headers['x-correlation-id'] || uuid();
// Include in all eventsconst event = { // Other event fields... correlationId, // If this event was caused by another event causationId: previousEvent?.eventId};
Event Logging
Log event publishing and consumption with consistent format:
// Producer logginglogger.info('Publishing event', { eventId: event.eventId, eventType: event.eventType, correlationId: event.correlationId});
// Consumer logginglogger.info('Consuming event', { eventId: event.eventId, eventType: event.eventType, correlationId: event.correlationId, consumer: 'inventory-service'});
Event Monitoring
Monitor your event streams using our standard observability stack:
- Kafka Metrics: Lag, throughput, errors
- Schema Registry Metrics: Validation failures, compatibility checks
- Service Metrics: Event processing times, failure rates
- Custom Dashboards: Domain-specific event flows
Access dashboards at https://grafana.flowmart.com/d/events
Event Schema Governance
Change Management
- Proposal: Document the schema change with rationale
- Review: Domain experts review for business requirements
- Compatibility Check: Verify with schema registry
- Approval: Get sign-off from service team leads
- Publication: Register schema and announce change
Schema Review Checklist
✅ Schema follows naming conventions
✅ Required fields are truly necessary
✅ Field types are appropriate
✅ Enums have complete value lists
✅ Constraints (min, max, etc.) are appropriate
✅ Documentation is complete
✅ Versioning follows semantic versioning
✅ Compatibility type is specified
Conclusion
Well-designed event schemas are foundational to reliable event-driven systems. Following FlowMart’s event schema guidelines ensures our services can communicate reliably today and evolve confidently tomorrow.
Next Steps
- Explore API design best practices
- Understand Observability in microservices
- Learn about CI/CD for microservices