Skip to content

Commit 90b7a34

Browse files
authored
Merge pull request #13 from BOURGUI07/test
Redesign the product event publisher implementation using the Listene…
2 parents 9eadd43 + d1ba735 commit 90b7a34

File tree

14 files changed

+137
-427
lines changed

14 files changed

+137
-427
lines changed

catalog-service/src/main/java/com/example/catalog_service/config/ProductEventPublisherConfig.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,4 @@ public Supplier<Flux<Message<ProductEvent>>> producer(){
2727
.doOnNext(msg -> log.info("Catalog-Service Produced Product Event: {}", Util.write(msg.getPayload())));
2828
}
2929

30-
31-
@Bean
32-
public Supplier<Flux<Message<ProductEvent>>> productViewProducer(){
33-
return () -> publisher.publishViewedProducts()
34-
.map(MessageConverter.toProductEventMessage())
35-
.doOnNext(msg -> log.info("Catalog-Service Produced Product Viewed Event: {}", Util.write(msg.getPayload())));
36-
}
3730
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.example.catalog_service.domain;
2+
3+
public enum ProductAction {
4+
VIEWED,
5+
CREATED,
6+
DELETED,
7+
UPDATED
8+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.example.catalog_service.dto;
2+
3+
import com.example.catalog_service.domain.ProductAction;
4+
import lombok.Builder;
5+
6+
@Builder
7+
public record ProductActionDTO(
8+
ProductAction action,
9+
ProductResponse response
10+
) {
11+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.example.catalog_service.listener;
2+
3+
import com.example.catalog_service.dto.ProductActionDTO;
4+
import com.example.catalog_service.events.ProductEvent;
5+
import reactor.core.publisher.Flux;
6+
import reactor.core.publisher.Mono;
7+
8+
public interface ProductEventListener {
9+
10+
Flux<ProductEvent> productEvents();
11+
12+
default void listen(ProductActionDTO dto){
13+
switch (dto.action()) {
14+
case VIEWED -> handleViewedProduct(dto);
15+
case CREATED -> handleCreatedProduct(dto);
16+
case UPDATED -> handleUpdatedProduct(dto);
17+
case DELETED -> handleDeletedProduct(dto);
18+
}
19+
}
20+
21+
void handleDeletedProduct(ProductActionDTO dto);
22+
23+
void handleUpdatedProduct(ProductActionDTO dto);
24+
25+
void handleCreatedProduct(ProductActionDTO dto);
26+
27+
void handleViewedProduct(ProductActionDTO dto);
28+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.example.catalog_service.listener;
2+
3+
import com.example.catalog_service.dto.ProductActionDTO;
4+
import com.example.catalog_service.events.ProductEvent;
5+
import com.example.catalog_service.mapper.Mapper;
6+
import org.springframework.stereotype.Service;
7+
import reactor.core.publisher.Flux;
8+
import reactor.core.publisher.Mono;
9+
import reactor.core.publisher.Sinks;
10+
11+
@Service
12+
public class ProductEventListenerImpl implements ProductEventListener {
13+
private final Sinks.Many<ProductEvent> sink = Sinks.many().unicast().onBackpressureBuffer();
14+
15+
@Override
16+
public Flux<ProductEvent> productEvents(){
17+
return sink.asFlux();
18+
}
19+
20+
@Override
21+
public void handleDeletedProduct(ProductActionDTO dto) {
22+
var deletedEvent = Mapper.toDeletedProductEvent().apply(dto);
23+
sink.tryEmitNext(deletedEvent);
24+
}
25+
26+
@Override
27+
public void handleUpdatedProduct(ProductActionDTO dto) {
28+
var updatedEvent = Mapper.toUpdatedProductEvent().apply(dto);
29+
sink.tryEmitNext(updatedEvent);
30+
}
31+
32+
@Override
33+
public void handleCreatedProduct(ProductActionDTO dto) {
34+
var createdEvent = Mapper.toCreatedProductEvent().apply(dto);
35+
sink.tryEmitNext(createdEvent);
36+
}
37+
38+
@Override
39+
public void handleViewedProduct(ProductActionDTO dto) {
40+
var viewedEvent = Mapper.toViewedProductEvent().apply(dto);
41+
sink.tryEmitNext(viewedEvent);
42+
}
43+
}

catalog-service/src/main/java/com/example/catalog_service/mapper/Mapper.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.example.catalog_service.mapper;
22

33
import com.example.catalog_service.domain.Product;
4+
import com.example.catalog_service.domain.ProductAction;
45
import com.example.catalog_service.domain.ProductInventory;
56
import com.example.catalog_service.dto.*;
67
import com.example.catalog_service.events.InventoryEvent;
@@ -114,31 +115,51 @@ public static BiFunction<Throwable,OrderEvent.Created,Mono<InventoryEvent>> toDe
114115
.build());
115116
}
116117

118+
public static BiFunction<ProductAction,ProductResponse,ProductActionDTO> toProductActionDTO(){
119+
return (action, dto) -> ProductActionDTO.builder()
120+
.action(action)
121+
.response(dto)
122+
.build();
123+
}
117124

118-
public static Function<ProductResponse, ProductEvent> toCreatedProductEvent(){
125+
public static ProductActionDTO productActionDTO(ProductAction action,String code){
126+
return ProductActionDTO.builder()
127+
.action(action)
128+
.response(ProductResponse.builder().code(code).build())
129+
.build();
130+
}
131+
132+
public static Function<ProductActionDTO,ProductEvent> toViewedProductEvent(){
133+
return dto -> ProductEvent.View.builder()
134+
.code(dto.response().code())
135+
.build();
136+
}
137+
138+
public static Function<ProductActionDTO, ProductEvent> toCreatedProductEvent(){
119139
return dto -> ProductEvent.Created.builder()
120-
.code(dto.code())
121-
.price(dto.price())
122-
.productId(dto.id())
140+
.code(dto.response().code())
141+
.productId(dto.response().id())
142+
.price(dto.response().price())
123143
.build();
124144
}
125145

126-
public static Function<ProductResponse, ProductEvent> toUpdatedProductEvent(){
146+
public static Function<ProductActionDTO, ProductEvent> toUpdatedProductEvent(){
127147
return dto -> ProductEvent.Updated.builder()
128-
.price(dto.price())
129-
.code(dto.code())
148+
.code(dto.response().code())
149+
.price(dto.response().price())
130150
.build();
131151
}
132152

133-
134-
public static Function<String, ProductEvent> toDeletedProductEvent(){
135-
return code -> ProductEvent.Deleted.builder()
136-
.code(code)
153+
public static Function<ProductActionDTO, ProductEvent> toDeletedProductEvent(){
154+
return dto -> ProductEvent.Deleted.builder()
155+
.code(dto.response().code())
137156
.build();
138157
}
139158

140159

141160

142161

143162

163+
164+
144165
}

catalog-service/src/main/java/com/example/catalog_service/publisher/ProductEventPublisher.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.example.catalog_service.publisher;
22

33
import com.example.catalog_service.events.ProductEvent;
4+
import com.example.catalog_service.listener.ProductEventListener;
45
import com.example.catalog_service.service.ProductService;
56
import lombok.RequiredArgsConstructor;
67
import lombok.extern.slf4j.Slf4j;
@@ -18,18 +19,14 @@
1819
@RequiredArgsConstructor
1920
@Slf4j
2021
public class ProductEventPublisher {
21-
private final ProductService service;
22+
private final ProductEventListener listener;
2223

2324

2425
public Flux<ProductEvent> publish() {
25-
return service.products();
26+
return listener.productEvents();
2627
}
2728

2829

29-
public Flux<ProductEvent> publishViewedProducts() {
30-
return service.viewedProducts();
31-
}
32-
3330

3431

3532

catalog-service/src/main/java/com/example/catalog_service/service/ProductService.java

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package com.example.catalog_service.service;
22

33
import com.example.catalog_service.domain.Product;
4+
import com.example.catalog_service.domain.ProductAction;
45
import com.example.catalog_service.dto.*;
56
import com.example.catalog_service.events.ProductEvent;
67
import com.example.catalog_service.exceptions.ApplicationsExceptions;
8+
import com.example.catalog_service.listener.ProductEventListener;
79
import com.example.catalog_service.mapper.Mapper;
810
import com.example.catalog_service.repo.ProductRepo;
911
import java.math.BigDecimal;
@@ -35,25 +37,9 @@
3537
public class ProductService {
3638
private final ProductRepo repo;
3739
private final CatalogServiceProperties properties;
38-
39-
private final Sinks.Many<ProductEvent> sink = Sinks.many().unicast().onBackpressureBuffer();
40-
private final Sinks.Many<ProductEvent> viewedProductsSink = Sinks.many().unicast().onBackpressureBuffer();
41-
4240
private final CacheTemplate<String,Product> cacheTemplate;
41+
private final ProductEventListener listener;
4342

44-
/**
45-
* This is gonna be used by the CreatedProductEventPublisher
46-
*/
47-
public Flux<ProductEvent> products(){
48-
return sink.asFlux();
49-
}
50-
51-
/**
52-
* This is gonna be used by the ViewedProductEventPublisher
53-
*/
54-
public Flux<ProductEvent> viewedProducts(){
55-
return viewedProductsSink.asFlux();
56-
}
5743

5844

5945
/**
@@ -84,7 +70,7 @@ public Mono<PagedResult<ProductResponse>> getProducts(int page) {
8470
public Mono<ProductResponse> findByCode(String code) {
8571
return cacheTemplate.findByKey(code)
8672
.map(Mapper.toDto())
87-
.doOnNext(dto -> viewedProductsSink.tryEmitNext(ProductEvent.View.builder().code(code).build()))
73+
.doOnNext(dto -> listener.listen(Mapper.toProductActionDTO().apply(ProductAction.VIEWED,dto)))
8874
.doOnNext(response -> log.info("Product with code: {} is: {}", code, Util.write(response)));
8975
}
9076

@@ -104,7 +90,7 @@ public Mono<ProductResponse> createProduct(Mono<ProductCreationRequest> request)
10490
.map(Mapper.toEntity())
10591
.flatMap(repo::save)
10692
.map(Mapper.toDto())
107-
.doOnNext(dto -> sink.tryEmitNext(Mapper.toCreatedProductEvent().apply(dto)))
93+
.doOnNext(dto -> listener.listen(Mapper.toProductActionDTO().apply(ProductAction.CREATED,dto)))
10894
.doOnNext(response -> log.info("A New Product is Created: {}", Util.write(response)));
10995

11096
}
@@ -135,7 +121,7 @@ public Mono<Void> deleteByCode(String code){
135121
return cacheTemplate.findByKey(code)
136122
.flatMap(product -> repo.delete(product).then(cacheTemplate.doOnChanged(product)).then())
137123
.doOnSuccess(x->{
138-
sink.tryEmitNext(Mapper.toDeletedProductEvent().apply(code));
124+
listener.handleDeletedProduct(Mapper.productActionDTO(ProductAction.DELETED,code));
139125
log.info("Product Deleted Successfully");
140126
});
141127
}
@@ -152,7 +138,7 @@ public Mono<ProductResponse> update(String code, Mono<ProductUpdateRequest> requ
152138
.flatMap(Function.identity())
153139
.doOnNext(response -> {
154140
log.info("Product Updated Successfully: {}", Util.write(response));
155-
sink.tryEmitNext(Mapper.toUpdatedProductEvent().apply(response));
141+
listener.handleUpdatedProduct(Mapper.toProductActionDTO().apply(ProductAction.UPDATED,response));
156142
});
157143
}
158144

catalog-service/src/main/resources/application.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ spring:
6262
############# SPRING CLOUD STREAM WITH KAFKA CONFIGURATION ###############
6363
cloud:
6464
function:
65-
definition: processor;producer;productViewProducer
65+
definition: processor;producer
6666
stream:
6767
default:
6868
producer:

catalog-service/src/test/java/com/example/catalog_service/integration_tests/MESSAGING_abstract_tests/AbstractProductViewPublisherTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
@Import(AbstractProductViewPublisherTest.testConfig.class)
1919
@TestPropertySource(properties = {
20-
"spring.cloud.function.definition=productViewProducer;consumer",
20+
"spring.cloud.function.definition=producer;consumer",
2121
"spring.cloud.stream.bindings.consumer-in-0.destination=catalog-events"
2222
})
2323
public class AbstractProductViewPublisherTest extends AbstractIntegrationTest {

0 commit comments

Comments
 (0)