티스토리 뷰
1. Order.java 포트 메소드에서 수정하기
@PostPersist
public void onPostPersist() {
OrderPlaced orderPlaced = new OrderPlaced(this);
orderPlaced.publishAfterCommit(getId());
}
@PostUpdate
public void onPostUpdate() {
OrderModified orderModified = new OrderModified(this);
orderModified.publishAfterCommit(getId());
}
getId()로 명시적으로 같은 파티션에 넣어버리기

2. AbstractEvent.java 포트 메소드에서 수정하기
order/src/main/java/kafka/scaling/infra/AbstractEvent.java
이 코드는 Spring Cloud Stream + Kafka를 사용하여 도메인 이벤트를 Kafka로 발행(publish) 하는 로직입니다.
정리하면, 도메인 이벤트 객체(AbstractEvent를 상속한 이벤트)가 트랜잭션 커밋 이후 Kafka로 메시지를 보내는 코드
public void publish(String messageKey) {
/**
* spring streams 방식
*/
KafkaProcessor processor = OrderApplication.applicationContext.getBean(
KafkaProcessor.class
);
MessageChannel outputChannel = processor.outboundTopic();
outputChannel.send(
MessageBuilder
.withPayload(this)
.setHeader(
MessageHeaders.CONTENT_TYPE,
MimeTypeUtils.APPLICATION_JSON
)
.setHeader("type", getEventType())
.setHeader(KafkaHeaders.MESSAGE_KEY, messageKey.getBytes())
.build()
);
}
public void publishAfterCommit(Long messageKey) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronizationAdapter() {
@Override
public void afterCompletion(int status) {
AbstractEvent.this.publish(String.valueOf(messageKey));
}
}
);
}
3. 주문 서비스를 재시작
console order
Ctrl + C 재시작
console delivery
Ctrl + C 재시작
console deliver-2nd
Ctrl + C 재시작
console order
mvn clean spring-boot:run
'클라우드 > 클라우드 네이티브 애플리케이션' 카테고리의 다른 글
| [클라우드 네이티브] 19. Kafka 확장과 동시성 처리순서가 지켜지지 않는 메시징 (0) | 2025.06.16 |
|---|---|
| [클라우드 네이티브] 18. Kafka 확장과 동시성 처리 환경에 따른 메시징 처리 Test (1) | 2025.06.16 |
| [클라우드 네이티브] 17. Kafka 확장과 동시성 처리 주문, 배송서비스 실행 (0) | 2025.06.13 |
| [클라우드 네이티브] 16. Kafka 동적 확장 파티션 확장 (2) | 2025.06.13 |
| [클라우드 네이티브] 15. Kafka 동적 확장 order 서비스를 통해 주문 요청 (5) | 2025.06.13 |