The goal of this project is to implement a "News" processing pipeline composed of five Spring Boot applications: producer-api, categorizer-service, collector-service, publisher-api and news-client.
On ivangfr.github.io, I have compiled my Proof-of-Concepts (PoCs) and articles. You can easily search for the technology you are interested in by using the filter. Who knows, perhaps I have already implemented a PoC or written an article about what you are looking for.
-
[Medium]: Implementing a Kafka Producer and Consumer using Spring Cloud Stream
-
[Medium]: Implementing Unit Tests for a Kafka Producer and Consumer that uses Spring Cloud Stream
-
[Medium]: Implementing End-to-End testing for a Kafka Producer and Consumer that uses Spring Cloud Stream
-
[Medium]: Using CloudEvents in a Kafka Producer and Consumer that uses Spring Cloud Stream
-
Spring Cloud Streamto build highly scalable event-driven applications connected with shared messaging systems; -
Spring Cloud Schema Registrythat supports schema evolution so that the data can be evolved over time; besides, it lets you store schema information in a textual format (typically JSON) and makes that information accessible to various applications that need it to receive and send data in binary format; -
Spring Data Elasticsearchto persist data inElasticsearch; -
Spring Cloud OpenFeignto write web service clients easily; -
Thymeleafas HTML template; -
Zipkinto visualize traces between and within applications; -
Eurekaas service registration and discovery.
|
Note
|
In docker-swarm-environment repository, it is shown how to deploy this project into a cluster of Docker Engines in swarm mode.
|
-
producer-api
Spring BootWeb Java application that creates news and pushes news events toproducer.newstopic inKafka. -
categorizer-service
Spring BootWeb Java application that listens to news events inproducer.newstopic inKafka, categorizes and pushes them tocategorizer.newstopic. -
collector-service
Spring BootWeb Java application that listens for news events incategorizer.newstopic inKafka, saves them inElasticsearchand pushes the news events tocollector.newstopic. -
publisher-api
Spring BootWeb Java application that reads directly fromElasticsearchand exposes a REST API. It doesn’t listen fromKafka. -
news-client
Spring BootWeb java application that provides a User Interface to see the news. It implements aWebsocketthat consumes news events from the topiccollector.news. So, news are updated on the fly on the main page. Besides,news-clientcommunicates directly withpublisher-apiwhenever search for a specific news or news update are needed.The
Websocketoperation is shown in the short gif below. News is created inproducer-apiand, immediately, it appears innews-client.
-
Open a terminal and navigate to the
spring-cloud-stream-kafka-elasticsearchroot folder; -
Run the following command to generate
NewsEvent:./mvnw clean install --projects commons-news
It will install
commons-news-1.0.0.jarin your localMavenrepository, so that it can be visible by all services.
-
In a terminal, navigate to the
spring-cloud-stream-kafka-elasticsearchroot folder, and run:docker compose up -d
-
Wait for Docker containers to be up and running. To check it, run:
docker ps -a
Inside the spring-cloud-stream-kafka-elasticsearch root folder, run the following Maven commands in different terminals:
-
eureka-server
./mvnw clean spring-boot:run --projects eureka-server
-
producer-api
./mvnw clean spring-boot:run --projects producer-api -Dspring-boot.run.jvmArguments="-Dserver.port=9080" -
categorizer-service
./mvnw clean spring-boot:run --projects categorizer-service -Dspring-boot.run.jvmArguments="-Dserver.port=9081" -
collector-service
./mvnw clean spring-boot:run --projects collector-service -Dspring-boot.run.jvmArguments="-Dserver.port=9082" -
publisher-api
./mvnw clean spring-boot:run --projects publisher-api -Dspring-boot.run.jvmArguments="-Dserver.port=9083" -
news-client
./mvnw clean spring-boot:run --projects news-client
-
In a terminal, make sure you are in the
spring-cloud-stream-kafka-elasticsearchroot folder; -
To build the application’s Docker images, run the following script:
./build-docker-images.sh
-
producer-api
Environment Variable Description KAFKA_HOSTSpecify host of the
Kafkamessage broker to use (defaultlocalhost)KAFKA_PORTSpecify port of the
Kafkamessage broker to use (default29092)SCHEMA_REGISTRY_HOSTSpecify host of the
Schema Registryto use (defaultlocalhost)SCHEMA_REGISTRY_PORTSpecify port of the
Schema Registryto use (default8081)EUREKA_HOSTSpecify host of the
Eurekaservice discovery to use (defaultlocalhost)EUREKA_PORTSpecify port of the
Eurekaservice discovery to use (default8761)ZIPKIN_HOSTSpecify host of the
Zipkindistributed tracing system to use (defaultlocalhost)ZIPKIN_PORTSpecify port of the
Zipkindistributed tracing system to use (default9411) -
categorizer-service
Environment Variable Description KAFKA_HOSTSpecify host of the
Kafkamessage broker to use (defaultlocalhost)KAFKA_PORTSpecify port of the
Kafkamessage broker to use (default29092)SCHEMA_REGISTRY_HOSTSpecify host of the
Schema Registryto use (defaultlocalhost)SCHEMA_REGISTRY_PORTSpecify port of the
Schema Registryto use (default8081)EUREKA_HOSTSpecify host of the
Eurekaservice discovery to use (defaultlocalhost)EUREKA_PORTSpecify port of the
Eurekaservice discovery to use (default8761)ZIPKIN_HOSTSpecify host of the
Zipkindistributed tracing system to use (defaultlocalhost)ZIPKIN_PORTSpecify port of the
Zipkindistributed tracing system to use (default9411) -
collector-service
Environment Variable Description ELASTICSEARCH_HOSTSpecify host of the
Elasticsearchsearch engine to use (defaultlocalhost)ELASTICSEARCH_NODES_PORTSpecify nodes port of the
Elasticsearchsearch engine to use (default9300)ELASTICSEARCH_REST_PORTSpecify rest port of the
Elasticsearchsearch engine to use (default9200)KAFKA_HOSTSpecify host of the
Kafkamessage broker to use (defaultlocalhost)KAFKA_PORTSpecify port of the
Kafkamessage broker to use (default29092)SCHEMA_REGISTRY_HOSTSpecify host of the
Schema Registryto use (defaultlocalhost)SCHEMA_REGISTRY_PORTSpecify port of the
Schema Registryto use (default8081)EUREKA_HOSTSpecify host of the
Eurekaservice discovery to use (defaultlocalhost)EUREKA_PORTSpecify port of the
Eurekaservice discovery to use (default8761)ZIPKIN_HOSTSpecify host of the
Zipkindistributed tracing system to use (defaultlocalhost)ZIPKIN_PORTSpecify port of the
Zipkindistributed tracing system to use (default9411) -
publisher-api
Environment Variable Description ELASTICSEARCH_HOSTSpecify host of the
Elasticsearchsearch engine to use (defaultlocalhost)ELASTICSEARCH_NODES_PORTSpecify nodes port of the
Elasticsearchsearch engine to use (default9300)ELASTICSEARCH_REST_PORTSpecify rest port of the
Elasticsearchsearch engine to use (default9200)EUREKA_HOSTSpecify host of the
Eurekaservice discovery to use (defaultlocalhost)EUREKA_PORTSpecify port of the
Eurekaservice discovery to use (default8761)ZIPKIN_HOSTSpecify host of the
Zipkindistributed tracing system to use (defaultlocalhost)ZIPKIN_PORTSpecify port of the
Zipkindistributed tracing system to use (default9411) -
news-client
Environment Variable Description KAFKA_HOSTSpecify host of the
Kafkamessage broker to use (defaultlocalhost)KAFKA_PORTSpecify port of the
Kafkamessage broker to use (default29092)SCHEMA_REGISTRY_HOSTSpecify host of the
Schema Registryto use (defaultlocalhost)SCHEMA_REGISTRY_PORTSpecify port of the
Schema Registryto use (default8081)EUREKA_HOSTSpecify host of the
Eurekaservice discovery to use (defaultlocalhost)EUREKA_PORTSpecify port of the
Eurekaservice discovery to use (default8761)ZIPKIN_HOSTSpecify host of the
Zipkindistributed tracing system to use (defaultlocalhost)ZIPKIN_PORTSpecify port of the
Zipkindistributed tracing system to use (default9411)
| Application | URL |
|---|---|
producer-api |
|
publisher-api |
|
news-client |
-
Eureka
Eurekacan be accessed at http://localhost:8761 -
Zipkin
Zipkincan be accessed at http://localhost:9411 -
Kafdrop
Kafdropcan be accessed at http://localhost:9000 -
Schema Registry UI
Schema Registry UIcan be accessed at http://localhost:8001 -
Elasticsearch REST API
Check if ES is up and running
curl localhost:9200
Check indexes
curl "localhost:9200/_cat/indices?v"Check news index mapping
curl "localhost:9200/news/_mapping?pretty"Simple search
curl "localhost:9200/news/_search?pretty"Delete news index
curl -X DELETE localhost:9200/news
-
To stop applications:
-
If they were started with
Maven, go to the terminals where they are running and pressCtrl+C. -
If they were started as Docker containers, in a terminal and inside the
spring-cloud-stream-kafka-elasticsearchroot folder, run the script below:./stop-apps.sh
-
-
To stop and remove Docker Compose containers, network and volumes, in a terminal, navigate to the
spring-cloud-stream-kafka-elasticsearchroot folder, and run the following command:docker compose down -v



