Message broker

요즘 회사에서 새로운 시스템이 개발될 때, 레거시 시스템과의 연동이 많아지면서 Message broker 도입의 필요성에 대해 이야기 하고 있습니다. (조금 늦었지만..)

메세지 브로커 선택 시 어떤 브로커를 선택 하느냐 또한 중요한데요, 하자면 저희 회사에서는 Amazon MQ를 선택하게 되었습니다.

Amazon MQ는 지난 2018년7월부터 서울 리전에서도 사용 가능한 AWS 서비스 중 하나인데요, 내부적으로는 Apache Active MQ를 사용하도록 구성되어있습니다.

-Amazon MQ 장점

-구성이 쉽다 (유지보수 쉬움)

-가용성 (failover 지원)

-준수한 성능(22k건/초) (은 딱히 장점은 아닌듯 합니다)

-Amazon MQ 단점

-Rabbit MQ나 Apache Kafka에 비해서 상대적으로 떨어지는 인지도(레퍼런스가 덜 풍부함)

-Spring-cloud-stream 을 바로 사용할수 없음

서비스 선택 시 상세한 고려 사항은 https://stackshare.io/stackups/activemq-vs-kafka-vs-rabbitmq를 참고하셔도 됩니다.

Amazon MQ 선택시 마지막까지 고민하게 만들었던 부분은 spring-cloud-stream을 사용하여 구현할 수 없다는 점이었는데요, Spring-cloud-stream
을 사용하여 서비스를 구현하면 간단한 어노테이션 추가만으로도 구현이 가능했던 점이 아주 큰 장점이었거든요.

But, 찾아보니 Amazon MQ도 비슷한 난이도로 구현할 수 있는 방법이 있어 이를 구현하는 방법을 정리 차원에서 작성하고자 합니다. start!


Amazon console에서 Active MQ를 설정하는 방법은 건너뛰도록 하겠습니다.

  1. spring boot을 사용하여 Project를 생성

저는 spring-boot-devtools 정도만 추가 했습니다.(필요에 따라서는 lombok등도 추가 해주시면 됩니다..)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
  1. Maven pom.xml에 디펜던시 추가

activemq 관련 디펜던시를 추가해줍니다.

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
  1. application.properties에 설청값 셋팅
1
2
3
spring.activemq.broker-url=<broker-url>
spring.activemq.user=<user id>
spring.activemq.password=<password>
  1. 전달할 메세지 형태 클래스 Message.java를 추가 합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.io.Serializable;

public class Message implements Serializable {

private static final long serialVersionUID = -1163890830946122942L;

private String id;
private String name;

public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
  1. 메세지를 생성해서 MQ에 보낼 producer 클래스와 소비하게 될 consumer 클래스를 추가 합니다.

Consumer.java

1
2
3
4
5
6
7
8
9
10
11
12
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

@JmsListener(destination = "sample.queue")
public void receiveQueue(Message message) {
System.out.println(message.getId() + ", "+ message.getName());
}

}

Producer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import javax.jms.Queue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

@Component
public class Producer {

@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;

@Autowired
private Queue queue;

public void send(Message message) {
jmsMessagingTemplate.convertAndSend(queue, message);
}

}
  1. @SpringBootApplication클래스에 Bean 추가
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootApplication
public class JmstestApplication {

@Bean
public Queue queue() {
return new ActiveMQQueue("sample.queue");
}

public static void main(String[] args) {
SpringApplication.run(JmstestApplication.class, args);
}
}


  1. 마지막으로 test class를 작성 해줍니다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    import org.junit.Rule;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.boot.test.rule.OutputCapture;
    import org.springframework.test.context.junit4.SpringRunner;

    import static org.assertj.core.api.Assertions.assertThat;

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class JmstestApplicationTests {

    @Rule
    public OutputCapture outputCapture = new OutputCapture();

    @Autowired
    private Producer producer;

    @Test
    public void sendSimpleMessage() throws InterruptedException {

    //given
    Message msg = new Message();
    msg.setId("1");
    msg.setName("a message from JH");

    //when
    producer.send(msg);


    //then
    Thread.sleep(1000L);
    assertThat(outputCapture.toString().contains("1, a message from JH")).isTrue();
    }

    }
  2. 테스트 클래스를 실행해봅시다. 실행 전에 실행 환경 변수에 SERIALIZABLE_PACKAGES 설정값을 추가 해 주어야
    오류 메세지를 만나지 않고 pojo를 전송하고 받을 수 있게 됩니다. 보안 상 신뢰할 수 있는 패키지를 환경설정값에 넣어야만 하도록 되어있기 때문입니다.

    1
    -Dorg.apache.activemq.SERIALIZABLE_PACKAGES="<패키지 경로>"
  3. 테스트 클래스 실행 결과

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21

    . ____ _ __ _ _
    /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
    ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
    \\/ ___)| |_)| | | | | || (_| | ) ) ) )
    ' |____| .__|_| |_|_| |_\__, | / / / /
    =========|_|==============|___/=/_/_/_/
    :: Spring Boot :: (v2.0.5.RELEASE)

    2018-10-14 16:33:24.224 INFO 54635 --- [ main] c..jmstest.JmstestApplicationTests : Starting JmstestApplicationTests on TF-Mac-023ui-MacBook-Pro.local with PID 54635 (started by jeonghunKim in /Users/tf-mac-023/IdeaProjects/jmstest)
    2018-10-14 16:33:24.226 INFO 54635 --- [ main] c..jmstest.JmstestApplicationTests : No active profile set, falling back to default profiles: default
    2018-10-14 16:33:24.387 INFO 54635 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@6c1a5b54: startup date [Sun Oct 14 16:33:24 KST 2018]; root of context hierarchy
    2018-10-14 16:33:26.012 INFO 54635 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647
    2018-10-14 16:33:27.233 INFO 54635 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to ssl://
    2018-10-14 16:33:27.350 INFO 54635 --- [ main] c..jmstest.JmstestApplicationTests : Started JmstestApplicationTests in 3.996 seconds (JVM running for 6.586)
    2018-10-14 16:33:27.600 INFO 54635 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to ssl://
    1, a message from JH
    2018-10-14 16:33:28.961 INFO 54635 --- [ Thread-5] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@6c1a5b54: startup date [Sun Oct 14 16:33:24 KST 2018]; root of context hierarchy
    2018-10-14 16:33:28.962 INFO 54635 --- [ Thread-5] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147483647

    Process finished with exit code 0