 
A sample Symfony project implementing messenger component with apache kafka service.
This repository contains a symfony project, kafka and zookeeper servers.
Services run on docker.
In this example we have configured messenger component to :
- Publish message into kafka topic
- Consume messages from kafka topics
By default, kafka transport is not implemented by messenger that's why we created custom transport in kafka directory.
- git
- docker
- docker-compose
- make
git clone https://github.com/symfony-examples/messenger-kafka.gitThis command will create all services and the kafka topic
make install-localEnjoy ! 🥳
make console app:messenger:producer reference 2000reference and 2000 are required arguments, you can replace them by other values
This command will send a message to kafka topic
The topic is defined in config/packages/messenger.yaml producer_topic
framework:
    messenger:
        transports:
            order_transport:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    ...
                    producer_topic: 'order_topic_test'make console messenger:consume order_transportorder_transport is the transport name defined in config/packages/messenger.yaml
This command will consume messages from the kafka topic define in config/packages/messenger.yaml consumer_topics
framework:
    messenger:
        transports:
            order_transport:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    ...
                    consumer_topics:
                        - 'order_topic_test'Messages will be handled by App\Messenger\Handler\OrderPaidHandler
In this example we send an InvoiceCreatedMessage for each order paid, you can update this implementation and put your custom logic here.
// App\Messenger\Handler\OrderPaidHandler
public function __invoke(OrderPaidMessage $message): void
{
    // implement logic here
}rdkafka extension must be installed.
; kafka.ini
extension=rdkafka.so## SETUP RDKAFKA EXTESIONS
RUN set -xe \
    && apk add --no-cache --update --virtual .phpize-deps $PHPIZE_DEPS \
    librdkafka-dev \
    && pecl install rdkafka
COPY ./.docker/php/kafka.ini $PHP_INI_DIR/conf.d/Check if the extension is well installed
php --ri rdkafkaAdd environment variables in .env file:
# transport dsn must start with kafka://
MESSENGER_TRANSPORT_DSN=kafka://
# kafka broker list separate with comma (exp: kafka-1:9092,kafka-2:9092)
KAFKA_BROKERS=kafka:9092Configure your transport
framework:
    messenger:
        transports:
            order_transport:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    metadata.broker.list: '%env(KAFKA_BROKERS)%'
                    group.id: 'my-group-id'
                    auto.offset.reset: 'earliest'
                    # you can add here any rdkafka option you need
                    # https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
                    ...
                    consumer_topics:
                        - 'order_topic_test'Setup transport
make console messenger:setup-transportEnjoy ! 🥳