Medium allows developers to use message classes on Kafka without Schema Registry having total ordering among different message types on a single topic.
Medium provides an abstraction layer to Kafka or other streaming technologies so that you can use the same message library across your applications without the need of ugly generated message classes or having to worry about what message broker is handling your event.
- Allow topics to transport multiple types of messages enforcing total ordering. Very usefull were business data depends on time-correlation and there are different data types involved.
- Complete reusability of message classes. The whole dev team uses just one message library regardless of the underlying message techonology.
- No need of Kafka Schema Registry.
- No Abstract schema declaration and ugly generated classes.
- Class declaration is a very good way of presenting data structures.
- No more incompatible schema problems.
- No custom Serialization/Deserialization needed.
- MediumMessage class allows for simple message definition and automatic Serde.
- MediumMessagingService allows users to request callbacks when a certain type of message arrives from the broker.
- MediumMessageProcessor is the Kafka Streams interface. Develop your logic here and plug it into the topology. No Serdes.
- Testing tools include TestMessage, TestNetworkDriver, TestMessageGenerator that makes unit testing simple.
- Azure Service Bus Driver: https://github.com/bithauschile/bithaus-medium-azure
Simple Message definition
public class TestMessage extends MediumMessage {
public String name;
public Integer age;
}Using the MessagingService
Map configMap = ... // Uses Kafka style configuration
MediumMessagingServiceConfig config = new MediumMessagingServiceConfig(configMap);
MediumMessagingService instance = new MediumMessagingService(config);
instance.addMessageListener(TestMessage.class, new MediumMessageListener<TestMessage>() {
@Override
public String getName() {
return "listener-1";
}
@Override
public void onMessage(TestMessage message) throws MediumMessageListenerException {
System.out.println("Name: " + message.name + " Age: " + message.age);
}
});
TestMessage tm = new TestMessage("Nico", 13);
instance.send(tm);
// or if you know the topic name
instance.send(tm, "topic1"); Producer
public class SimpleMediumKafkaProducerApp {
private static Boolean running = true ;
public static void Main(String args[]) throws Exception {
Logger logger = LoggerFactory.getLogger("ProducerApp");
Map<K,V> originals = new HashMap<>();
// First the Messaging Service configuration
originals.put(MediumMessagingServiceConfig.DRIVER_CLASSNAME_CONFIG, MediumMessagingServiceKafkaDriver.class.getName());
originals.put(MediumMessagingServiceConfig.LOGGER_SUFIX_CONFIG, "KafkaProducerApp");
// Now the Kafka driver configuration
originals.put(MediumMessagingServiceKafkaDriverConfig.PRODUCER_ENABLED_CONFIG, "true");
originals.put(MediumMessagingServiceKafkaDriverConfig.PRODUCER_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
originals.put(MediumMessagingServiceKafkaDriverConfig.PRODUCER_CLIENTID_CONFIG, "medium-test-producer");
MediumMessagingServiceConfig config = new MediumMessagingServiceConfig(originals);
MediumMessagingService instance = new MediumMessagingService(config);
instance.start();
logger.info("Messaging service started");
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
logger.info("Stopping messaging service");
running = false;
instance.stop();
}
});
while(running) {
TestMessage message = new TestMessage("Hello World @ " + new Date());
try {
instance.send(message);
} catch (MediumMessagingServiceException ex) {
logger.error("Error sending message", ex);
System.exit(1);
}
Thread.sleep(1000);
}
}
}Consumer
public class SimpleMediumKafkaConsumerApp {
public static void Main(String args[]) throws Exception {
Logger logger = LoggerFactory.getLogger("ConsumerApp");
Gson gson = new Gson();
Map<K,V> originals = new HashMap<>();
// First the Messaging Service configuration
originals.put(MediumMessagingServiceConfig.DRIVER_CLASSNAME_CONFIG, MediumMessagingServiceKafkaDriver.class.getName());
originals.put(MediumMessagingServiceConfig.LOGGER_SUFIX_CONFIG, "KafkaConsumerApp");
// Now the Kafka driver configuration
originals.put(MediumMessagingServiceKafkaDriverConfig.CONSUMER_ENABLED_CONFIG, "true");
originals.put(MediumMessagingServiceKafkaDriverConfig.CONSUMER_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
originals.put(MediumMessagingServiceKafkaDriverConfig.CONSUMER_GROUPID_CONFIG, "medium-test-consumer");
originals.put(MediumMessagingServiceKafkaDriverConfig.CONSUMER_SUBSCRIPTIONTOPICS_CONFIG, "medium-test-topic");
MediumMessagingServiceConfig config = new MediumMessagingServiceConfig(originals);
MediumMessagingService instance = new MediumMessagingService(config);
instance.addMessageListener(TestMessage.class, new MediumMessageListener<TestMessage>() {
@Override
public String getName() {
return "test-listener";
}
@Override
public void onMessage(TestMessage message) throws MediumMessageListenerException {
logger.info("Received message: " + message + ", Metadata:" + gson.toJson(message.getMetadata()));
}
});
instance.start();
logger.info("Messaging service started");
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
logger.info("Stopping messaging service");
instance.stop();
}
});
}
}Around 2010 the Santiago Stock Exchange (XSGO) launched their new trading infrastructure and went over the top with sub-millisecond and thousands of messages per second load capacity. The team managed to build a consistent messging framework based on CLLM (former IBM WLLM) that provided total ordering even on clustered consumers. In order to transport all the different market brokers instructions to the market (new orders, modifications, cancellings, etc) and their corresponding responses, an abstraction layer was developed on top of LLM. It was made of just 2 things. One was a message definition with enough metadata to do routing and deserializing the correct message class on destination. The other was the Messaging Service that interacted with the underlying message technolgy to allow the developer to work only based on message classes.
Around 2013 Cummins Specto, IoT solution for Mining trucks created by Cummins Chile and developed by the Bithaus team, introduced a messaging system inpired in the XSGO experience that abstracted standard underlying MQ brokers (first HornetQ then RabbitMQ).
Currently we are working on new IoT systems and Kafka has been our streaming technology over the past years. This project collects our real experience using Kafka to solve common problems that we had that slowed our development cycles.