Event Schema Design

Best practices for designing and managing event schemas in FlowMart's event-driven architecture

This guide outlines best practices for designing, evolving, and managing event schemas in FlowMart’s event-driven architecture.

Introduction to Events in Our Architecture

Events are the backbone of FlowMart’s microservices ecosystem. They enable:

  • Loose coupling between services
  • Asynchronous communication
  • Eventual consistency across service boundaries
  • Event sourcing for critical business processes
  • Audit trails of system changes

Event Schema Fundamentals

What Is an Event Schema?

An event schema defines the structure and validation rules for events flowing through our system. Properly designed schemas ensure events can be:

  • Produced consistently by services
  • Consumed reliably by other services
  • Evolved over time without breaking consumers
  • Validated to prevent invalid data from propagating
  • Documented for developers to understand and use

Event Schema Registry

FlowMart uses a centralized Schema Registry to:

  1. Store all event schemas
  2. Validate events at publish time
  3. Provide a browsable catalog of events
  4. Track schema versions and compatibility
  5. Generate client libraries and documentation

All services must register their event schemas in the central registry before publishing events.

Schema Design Principles

1. Design for Evolution

Events should be designed to evolve over time:

  • Additive Changes Only: Add optional fields rather than modifying existing ones
  • Required Minimal Core: Keep required fields to essential business data
  • Meaningful Defaults: Provide sensible defaults for optional fields
  • Version Awareness: Include schema version information

2. Event Ownership

Each event type has a single owner:

  • The producing service owns the event schema
  • Only the owner can make changes to the schema
  • The owner is responsible for schema compatibility

3. Semantic Versioning

Follow semantic versioning for event schemas:

  • Major Version: Breaking changes (consumers must update)
  • Minor Version: Backward-compatible feature additions
  • Patch Version: Backward-compatible bug fixes

4. Business-Oriented Event Naming

Events should be named using business terminology:

  • Use past tense verbs (e.g., OrderPlaced, not CreateOrder)
  • Follow the pattern: [Entity][Event] (e.g., ProductCreated, PaymentProcessed)
  • Use domain-specific terminology consistent with our ubiquitous language

Event Schema Format (JSON Schema)

FlowMart uses JSON Schema as the standard format for defining event schemas:

{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://schemas.flowmart.com/events/product/ProductCreated/1.0.0",
"title": "ProductCreated",
"description": "Represents the creation of a new product in the catalog",
"type": "object",
"required": ["eventId", "eventType", "eventVersion", "timestamp", "data"],
"properties": {
"eventId": {
"type": "string",
"format": "uuid",
"description": "Unique identifier for this event instance"
},
"eventType": {
"type": "string",
"enum": ["ProductCreated"],
"description": "Type of the event"
},
"eventVersion": {
"type": "string",
"pattern": "^\\d+\\.\\d+\\.\\d+$",
"description": "Semantic version of the event schema"
},
"timestamp": {
"type": "string",
"format": "date-time",
"description": "ISO-8601 timestamp when the event was created"
},
"source": {
"type": "string",
"description": "Service that produced the event"
},
"data": {
"type": "object",
"required": ["productId", "name", "sku", "price"],
"properties": {
"productId": {
"type": "string",
"format": "uuid",
"description": "Unique identifier for the product"
},
"name": {
"type": "string",
"minLength": 1,
"maxLength": 255,
"description": "Name of the product"
},
"description": {
"type": "string",
"maxLength": 2000,
"description": "Description of the product"
},
"sku": {
"type": "string",
"pattern": "^[A-Z0-9-]{5,20}$",
"description": "Stock keeping unit - unique product identifier"
},
"price": {
"type": "object",
"required": ["amount", "currency"],
"properties": {
"amount": {
"type": "number",
"exclusiveMinimum": 0,
"description": "Price amount"
},
"currency": {
"type": "string",
"enum": ["USD", "EUR", "GBP", "CAD"],
"description": "Price currency code"
}
}
},
"categories": {
"type": "array",
"items": {
"type": "string"
},
"description": "Categories the product belongs to"
},
"attributes": {
"type": "object",
"additionalProperties": {
"type": ["string", "number", "boolean"]
},
"description": "Additional product attributes as key-value pairs"
}
}
},
"metadata": {
"type": "object",
"additionalProperties": true,
"description": "Additional contextual information about the event"
},
"correlationId": {
"type": "string",
"format": "uuid",
"description": "ID for correlating related events"
},
"causationId": {
"type": "string",
"format": "uuid",
"description": "ID of the event that caused this event"
}
},
"additionalProperties": false
}

Standard Event Envelope

All FlowMart events follow a standard envelope structure:

{
"eventId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
"eventType": "ProductCreated",
"eventVersion": "1.0.0",
"timestamp": "2023-09-15T13:25:47.803Z",
"source": "product-service",
"data": {
// Event-specific payload
},
"metadata": {
// Optional contextual information
},
"correlationId": "7f8d0e3c-d5f9-42e1-a11b-78ad6c0c380a",
"causationId": "3e4f5d6c-7b8a-9c0d-1e2f-3a4b5c6d7e8f"
}

Required Envelope Fields

FieldTypeDescription
eventIdUUIDUnique identifier for the event instance
eventTypeStringName of the event (e.g., ProductCreated)
eventVersionStringSemantic version of the event schema
timestampISO-8601When the event occurred
dataObjectEvent-specific payload

Optional Envelope Fields

FieldTypeDescription
sourceStringService that produced the event
metadataObjectAdditional context about the event
correlationIdUUIDID for correlating related events in a flow
causationIdUUIDID of the event that caused this event

Event Data Types

Primitive Types

  • String: Use for text data
  • Number: Use for numeric values (integers or decimals)
  • Boolean: Use for true/false flags
  • Array: Use for collections of the same type
  • Object: Use for nested structures

Specialized Formats

  • UUID: Use for unique identifiers (format: uuid)
  • ISO Date-Time: Use for timestamps (format: date-time)
  • Email: Use for email addresses (format: email)
  • URI: Use for web addresses (format: uri)
  • Decimal: Use for currency amounts (type: number)

Complex Types

For complex or reusable types, create separate schema definitions:

{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://schemas.flowmart.com/common/Address/1.0.0",
"title": "Address",
"type": "object",
"required": ["line1", "city", "postalCode", "country"],
"properties": {
"line1": {
"type": "string",
"maxLength": 100
},
"line2": {
"type": "string",
"maxLength": 100
},
"city": {
"type": "string",
"maxLength": 100
},
"region": {
"type": "string",
"maxLength": 100
},
"postalCode": {
"type": "string",
"maxLength": 20
},
"country": {
"type": "string",
"maxLength": 2,
"pattern": "^[A-Z]{2}$"
}
}
}

Then reference them in your event schemas:

{
"properties": {
"shippingAddress": {
"$ref": "https://schemas.flowmart.com/common/Address/1.0.0"
}
}
}

Common Event Patterns

State Change Events

Represent changes to an entity’s state:

{
"eventType": "OrderStatusChanged",
"data": {
"orderId": "61fea0a1-2ac4-4e8c-a851-d38f7c8c06f9",
"previousStatus": "PAYMENT_PENDING",
"newStatus": "PAYMENT_COMPLETED",
"reason": "Payment successful",
"changedBy": "payment-service"
}
}

Resource Creation Events

Represent the creation of a new entity:

{
"eventType": "CustomerCreated",
"data": {
"customerId": "cust-12345",
"email": "john.doe@example.com",
"firstName": "John",
"lastName": "Doe",
"createdAt": "2023-09-15T10:30:00Z"
}
}

Resource Update Events

Represent updates to an existing entity:

{
"eventType": "ProductUpdated",
"data": {
"productId": "b3c631a5-f7c8-4d89-a57f-dd2f069b5730",
"changes": {
"price": {
"amount": 24.99,
"currency": "USD"
},
"inventory": {
"inStock": 250
}
},
"updatedAt": "2023-09-15T14:22:36Z"
}
}

Action Events

Represent business actions that occurred:

{
"eventType": "PaymentProcessed",
"data": {
"paymentId": "pay-67890",
"orderId": "ord-12345",
"amount": 99.99,
"currency": "USD",
"status": "SUCCESSFUL",
"paymentMethod": "CREDIT_CARD",
"processedAt": "2023-09-15T13:45:22Z"
}
}

Schema Evolution

Compatibility Types

We support the following compatibility modes for schema evolution:

  • Backward: New schema can read data produced with previous schema
  • Forward: Previous schema can read data produced with new schema
  • Full: Both backward and forward compatibility
  • None: No compatibility guarantees (use with caution)

Backward Compatibility Rules

  1. Adding optional fields is safe
  2. Removing optional fields is safe
  3. Making a required field optional is safe
  4. Adding new enum values is safe
  5. Widening numeric ranges is safe (e.g., int to float)

Breaking Changes to Avoid

  1. Removing required fields
  2. Adding required fields
  3. Changing field types
  4. Renaming fields
  5. Restricting enum values

Handling Breaking Changes

If you must make a breaking change:

  1. Create a new major version of the schema
  2. Maintain both versions for a transition period
  3. Implement dual publishing for critical events
  4. Help consumers migrate to the new version
  5. Deprecate the old version with advance notice

Consuming Events

Consumer Best Practices

  1. Be tolerant in what you accept:

    • Ignore unknown fields
    • Provide defaults for missing optional fields
    • Handle enum values gracefully, including unknown values
  2. Validate incoming events:

    • Verify events against their schema
    • Check required fields
    • Validate business rules before processing
  3. Handle versioning gracefully:

    • Check event version before processing
    • Implement version-specific handlers if needed
    • Subscribe to schema registry updates

Consumer Code Example (TypeScript)

import { KafkaConsumer } from '@flowmart/kafka-client';
import { SchemaRegistry } from '@flowmart/schema-registry';
import { OrderProcessingService } from './services';
// Initialize schema registry client
const schemaRegistry = new SchemaRegistry({
baseUrl: 'https://schema-registry.flowmart.com',
});
// Define event interface
interface OrderPlacedEvent {
eventId: string;
eventType: 'OrderPlaced';
eventVersion: string;
timestamp: string;
data: {
orderId: string;
customerId: string;
items: Array<{
productId: string;
quantity: number;
unitPrice: number;
}>;
totalAmount: number;
currency: string;
shippingAddress: {
// Address fields...
};
};
// Other envelope fields...
}
// Initialize consumer
const orderConsumer = new KafkaConsumer({
groupId: 'inventory-service',
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
});
// Initialize service
const orderProcessor = new OrderProcessingService();
// Start consuming events
async function startConsumer() {
await orderConsumer.subscribe('order-events');
orderConsumer.on('message', async (message) => {
try {
// Parse the message
const rawEvent = JSON.parse(message.value.toString());
// Skip if not the event we're interested in
if (rawEvent.eventType !== 'OrderPlaced') {
return;
}
// Validate against schema
const isValid = await schemaRegistry.validate(
rawEvent,
'OrderPlaced',
rawEvent.eventVersion
);
if (!isValid) {
console.error('Invalid event schema', rawEvent);
return;
}
// Type-safe processing
const event = rawEvent as OrderPlacedEvent;
// Process the order
await orderProcessor.processNewOrder(event.data);
// Commit the offset
await orderConsumer.commitOffset(message);
} catch (error) {
console.error('Error processing order event', error);
// Implement retry/dead-letter logic
}
});
}
startConsumer().catch(console.error);

Publishing Events

Producer Best Practices

  1. Validate before publishing:

    • Ensure events comply with their schema
    • Verify business rules and data integrity
    • Set appropriate event headers
  2. Include essential metadata:

    • Generate a unique event ID
    • Set the correct event type and version
    • Include accurate timestamp
    • Set correlation and causation IDs
  3. Handle publishing failures:

    • Implement retry mechanisms with backoff
    • Store events temporarily if Kafka is unavailable
    • Log failed events for troubleshooting

Producer Code Example (TypeScript)

import { v4 as uuid } from 'uuid';
import { KafkaProducer } from '@flowmart/kafka-client';
import { SchemaRegistry } from '@flowmart/schema-registry';
import { Product } from './models';
// Initialize schema registry client
const schemaRegistry = new SchemaRegistry({
baseUrl: 'https://schema-registry.flowmart.com',
});
// Initialize producer
const producer = new KafkaProducer({
clientId: 'product-service',
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
});
export class ProductEventService {
async publishProductCreated(product: Product, correlationId?: string): Promise<void> {
const eventId = uuid();
const event = {
eventId,
eventType: 'ProductCreated',
eventVersion: '1.0.0',
timestamp: new Date().toISOString(),
source: 'product-service',
data: {
productId: product.id,
name: product.name,
summary: product.description || null,
sku: product.sku,
price: {
amount: product.price,
currency: 'USD' // Default to USD
},
categories: product.categories || [],
attributes: product.attributes || {}
},
metadata: {
// Add any additional metadata
},
correlationId: correlationId || eventId,
causationId: null // No previous event caused this
};
// Validate against schema
const isValid = await schemaRegistry.validate(
event,
'ProductCreated',
'1.0.0'
);
if (!isValid) {
const errors = await schemaRegistry.getValidationErrors(
event,
'ProductCreated',
'1.0.0'
);
throw new Error(`Invalid event schema: ${JSON.stringify(errors)}`);
}
// Publish event
await producer.send({
topic: 'product-events',
messages: [
{
key: product.id,
value: JSON.stringify(event),
headers: {
'eventType': 'ProductCreated',
'contentType': 'application/json'
}
}
]
});
console.log(`Published ProductCreated event: ${eventId}`);
}
}

Schema Registry Integration

Registering a New Schema

Terminal window
# Using CLI tool
flowmart-schema register \
--file ./schemas/ProductCreated.json \
--compatibility BACKWARD
# API endpoint
curl -X POST https://schema-registry.flowmart.com/subjects/ProductCreated/versions \
-H "Content-Type: application/json" \
-d @./schemas/ProductCreated.json

Retrieving a Schema

// Using JavaScript client
const schema = await schemaRegistry.getSchema('ProductCreated', '1.0.0');
// API endpoint
curl https://schema-registry.flowmart.com/subjects/ProductCreated/versions/latest

Checking Compatibility

// Using JavaScript client
const isCompatible = await schemaRegistry.checkCompatibility(
newSchema,
'ProductCreated'
);
// API endpoint
curl -X POST https://schema-registry.flowmart.com/compatibility/subjects/ProductCreated/versions/latest \
-H "Content-Type: application/json" \
-d @./schemas/ProductCreated.v2.json

Schema Registry UI

Our Schema Registry includes a web interface at https://schema-registry.flowmart.com/ui that provides:

  • Browsable catalog of all event schemas
  • Schema versioning history
  • Compatibility information
  • Schema validation tools
  • Documentation generation

Testing Event Schemas

Unit Testing Schemas

import { validateAgainstSchema } from '@flowmart/schema-validator';
import productCreatedSchema from './schemas/ProductCreated.json';
describe('ProductCreated schema', () => {
it('validates valid events', () => {
const validEvent = {
eventId: 'f47ac10b-58cc-4372-a567-0e02b2c3d479',
eventType: 'ProductCreated',
eventVersion: '1.0.0',
timestamp: '2023-09-15T13:25:47.803Z',
data: {
productId: 'b3c631a5-f7c8-4d89-a57f-dd2f069b5730',
name: 'Smartphone X Pro',
sku: 'SP-XPRO-2023',
price: {
amount: 999.99,
currency: 'USD'
}
}
};
const result = validateAgainstSchema(validEvent, productCreatedSchema);
expect(result.valid).toBe(true);
});
it('rejects events with missing required fields', () => {
const invalidEvent = {
eventId: 'f47ac10b-58cc-4372-a567-0e02b2c3d479',
eventType: 'ProductCreated',
eventVersion: '1.0.0',
timestamp: '2023-09-15T13:25:47.803Z',
data: {
// Missing required productId
name: 'Smartphone X Pro',
// Missing required sku
price: {
amount: 999.99,
currency: 'USD'
}
}
};
const result = validateAgainstSchema(invalidEvent, productCreatedSchema);
expect(result.valid).toBe(false);
expect(result.errors.length).toBeGreaterThan(0);
});
});

Integration Testing with Schema Registry

describe('Schema Registry Integration', () => {
it('registers and validates schema', async () => {
// Register test schema
await schemaRegistry.registerSchema(
'TestEvent',
testEventSchema,
'BACKWARD'
);
// Create test event
const testEvent = {
eventId: uuid(),
eventType: 'TestEvent',
eventVersion: '1.0.0',
timestamp: new Date().toISOString(),
data: {
// Test data...
}
};
// Validate against registered schema
const isValid = await schemaRegistry.validate(
testEvent,
'TestEvent',
'1.0.0'
);
expect(isValid).toBe(true);
});
});

Event Documentation

Self-Documenting Schemas

Use descriptive fields in your JSON Schema to auto-generate documentation:

{
"title": "ProductCreated",
"description": "Published when a new product is created in the catalog",
"properties": {
"data": {
"properties": {
"productId": {
"description": "Unique identifier for the product",
"examples": ["p-12345"]
}
}
}
}
}

Documentation in Code

Document event handling with clear comments:

/**
* Handles the ProductCreated event
* This event is triggered when a new product is added to the catalog.
* It updates the inventory service with the new product information.
*
* @param event The ProductCreated event
* @see https://schema-registry.flowmart.com/ui/schemas/ProductCreated/1.0.0
*/
async function handleProductCreated(event: ProductCreatedEvent): Promise<void> {
// Implementation...
}

Event Tracing and Debugging

Correlation IDs

Use correlation IDs to trace requests across services:

// When handling an API request
const correlationId = req.headers['x-correlation-id'] || uuid();
// Include in all events
const event = {
// Other event fields...
correlationId,
// If this event was caused by another event
causationId: previousEvent?.eventId
};

Event Logging

Log event publishing and consumption with consistent format:

// Producer logging
logger.info('Publishing event', {
eventId: event.eventId,
eventType: event.eventType,
correlationId: event.correlationId
});
// Consumer logging
logger.info('Consuming event', {
eventId: event.eventId,
eventType: event.eventType,
correlationId: event.correlationId,
consumer: 'inventory-service'
});

Event Monitoring

Monitor your event streams using our standard observability stack:

  1. Kafka Metrics: Lag, throughput, errors
  2. Schema Registry Metrics: Validation failures, compatibility checks
  3. Service Metrics: Event processing times, failure rates
  4. Custom Dashboards: Domain-specific event flows

Access dashboards at https://grafana.flowmart.com/d/events

Event Schema Governance

Change Management

  1. Proposal: Document the schema change with rationale
  2. Review: Domain experts review for business requirements
  3. Compatibility Check: Verify with schema registry
  4. Approval: Get sign-off from service team leads
  5. Publication: Register schema and announce change

Schema Review Checklist

✅ Schema follows naming conventions
✅ Required fields are truly necessary
✅ Field types are appropriate
✅ Enums have complete value lists
✅ Constraints (min, max, etc.) are appropriate
✅ Documentation is complete
✅ Versioning follows semantic versioning
✅ Compatibility type is specified

Conclusion

Well-designed event schemas are foundational to reliable event-driven systems. Following FlowMart’s event schema guidelines ensures our services can communicate reliably today and evolve confidently tomorrow.

Next Steps