Inventory adjusted (v1.0.1)
Indicates a change in inventory level
Inventory Adjusted 1.0.1 is in draft
New version of Inventory Adjusted is in draft
This is a new version of the Inventory Adjusted event. It is not yet ready for production. We are still working on it and collecting feedback from the team.
You can use this version in lower environments, but please be aware that it is still in draft and may change.
You can still use a previous version of the event, Inventory Adjusted 1.0.0, until that version is deprecated.
If you would like to provide feedback, please contact us at feedback@eventcatalog.io or our slack channel Order Management
Overview
The Inventory Adjusted
event is triggered whenever there is a change in the inventory levels of a product. This could occur due to various reasons such as receiving new stock, sales, returns, or manual adjustments by the inventory management team. The event ensures that all parts of the system that rely on inventory data are kept up-to-date with the latest inventory levels.
Architecture diagram
Payload example
Event example you my see being published.
{
"Name": "John Doe",
"Age": 30,
"Department": "Engineering",
"Position": "Software Engineer",
"Salary": 85000.50,
"JoinDate": "2024-01-15"
}
Schema (avro)
{ "type" : "record", "namespace" : "Tutorialspoint", "name" : "Employee", "fields" : [ { "name" : "Name", "type" : "string" }, { "name" : "Age", "type" : "int" }, { "name" : "Department", "type" : "string" }, { "name" : "Position", "type" : "string" }, { "name" : "Salary", "type" : "double" }, { "name" : "JoinDate", "type" : "string", "logicalType": "date" } ]}
Producing the Event
Select the language you want to produce the event in to see an example.
from kafka import KafkaProducer
import json
from datetime import datetime
# Kafka configuration
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Event data
event_data = {
"event_id": "abc123",
"timestamp": datetime.utcnow().isoformat() + 'Z',
"product_id": "prod987",
"adjusted_quantity": 10,
"new_quantity": 150,
"adjustment_reason": "restock",
"adjusted_by": "user123"
}
# Send event to Kafka topic
producer.send('inventory.adjusted', event_data)
producer.flush()
import { Kafka } from 'kafkajs';
// Kafka configuration
const kafka = new Kafka({
clientId: 'inventory-producer',
brokers: ['localhost:9092']
});
const producer = kafka.producer();
// Event data
const eventData = {
event_id: "abc123",
timestamp: new Date().toISOString(),
product_id: "prod987",
adjusted_quantity: 10,
new_quantity: 150,
adjustment_reason: "restock",
adjusted_by: "user123"
};
// Send event to Kafka topic
async function produceEvent() {
await producer.connect();
await producer.send({
topic: 'inventory.adjusted',
messages: [
{ value: JSON.stringify(eventData) }
],
});
await producer.disconnect();
}
produceEvent().catch(console.error);
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Properties;
import java.util.HashMap;
import java.util.Map;
import java.time.Instant;
public class InventoryProducer {
public static void main(String[] args) {
// Kafka configuration
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "inventory-producer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(props);
ObjectMapper mapper = new ObjectMapper();
try {
// Event data
Map<String, Object> eventData = new HashMap<>();
eventData.put("event_id", "abc123");
eventData.put("timestamp", Instant.now().toString());
eventData.put("product_id", "prod987");
eventData.put("adjusted_quantity", 10);
eventData.put("new_quantity", 150);
eventData.put("adjustment_reason", "restock");
eventData.put("adjusted_by", "user123");
// Create producer record
ProducerRecord<String, String> record = new ProducerRecord<>(
"inventory.adjusted",
mapper.writeValueAsString(eventData)
);
// Send event to Kafka topic
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error producing message: " + exception);
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.flush();
producer.close();
}
}
}
Consuming the Event
To consume an Inventory Adjusted event, use the following example Kafka consumer configuration in Python:
from kafka import KafkaConsumer
import json
# Kafka configuration
consumer = KafkaConsumer(
'inventory.adjusted',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='inventory_group',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Consume events
for message in consumer:
event_data = json.loads(message.value)
print(f"Received Inventory Adjusted event: {event_data}")
JSON Schema
A record representing an employee
The name of the employee
The age of the employee
The department where the employee works
The position or title of the employee within the department
The salary of the employee
The date when the employee joined the company
No properties match your search
Try a different search term or clear the search to see all properties