Spring Integration 테스트하기

Spring Integration 테스트하기
Photo by T K / Unsplash

최근 프로젝트에서 MQTT 연동을 하는 부분을 대대적으로 리팩토링 하게 되었는데 테스트 코드가 붙어있지 않아, 리팩토링의 선행 과제로 실제 MQTT 접속 부분을 제외한 나머지 플로우를 테스트하는 자동화 테스트를 작성하기로 했다.

비지니스 로직에 더하여 Spring Integration을 이용한 연동 셋업이 잘 돼있는지 확인하기 위하여 spring-integration-test 를 활용하였다.

이 글에서는 MQTT + Spring Integration을 활용하여 작성된 응용 프로그램을 테스트하는 방법을 보이기 위해 특정 topic (/mytest)로 부터 받은 문자열 메시지를 뒤집어서 동적으로 결정된 MQTT topic으로 보내는 응용 프로그램을 만들어본다.

이를 위한 간단한 integration flow는 아래와 같다.

위의  flow를 구성하기 위해 다음과 같은 Bean 및 IntegrationFlow를 구성한다.

1) MQTT Channel adapter

MQTT에서 특정 topic들을 구독하고 DefaultPahoMessageConverter 타입의 converter를 이용해 메시지 변환을 수행한 후 outputChannel(o.s.integration.endpoint.MessageProducerSupport 클래스의 속성)로 송신한다.

@Bean
@Qualifier("mqttChannelAdapter")
fun mqttChannelAdapter(): MessageProducerSupport {
    // mqttConfig는 MQTT 접속 설정을 위해 주입된 config bean이다.
    val adapter = MqttPahoMessageDrivenChannelAdapter(
        mqttConfig.clientId,
        mqttClientFactory(),
        *mqttConfig.topics.toTypedArray()
    )
    adapter.setQos(*mqttConfig.qos.toIntArray())
    adapter.setConverter(pahoMessageConverter())

    return adapter
}

private fun pahoMessageConverter(): DefaultPahoMessageConverter {
    val converter = DefaultPahoMessageConverter(mqttConfig.charset)
    converter.isPayloadAsBytes = mqttConfig.binary
    return converter
}

2) MQTT Outbound channel

단순한 DirectChannel로, 이 channel에 메시지를 보내면 MQTT Message Handler를 통해 MQTT로 publish 될 것이다.

@Bean
@Qualifier("mqttOutboundChannel")
fun mqttOutboundChannel(): MessageChannel {
    val chann = DirectChannel()
    chann.isLoggingEnabled = true
    chann.componentName = "mqttOutboundChannel"
    return chann
}

3) MQTT Message Handler

실제적으로 메시지를 받아 MQTT topic에 publish한다. 메시지를 내보낼 topic은 MessageHandlerdefaultTopic속성이나 Message header에 MqttHeaders.TOPIC 속성으로도 지정이 가능하다.

@Bean
@Qualifier("mqttOutboundHandler")
fun mqttOutboundHandler(): MessageHandler {
    val messageHandler = MqttPahoMessageHandler(MqttAsyncClient.generateClientId(), mqttClientFactory())
    messageHandler.setAsync(true)
    messageHandler.setDefaultTopic("myreply")
    return messageHandler
}

4) MQTT inbound IntegrationFlow

IngrationFlow builder 헬퍼 함수를 통해 작성된 MQTT Channel adapter로부터 메시지를 받아 비지니스 로직에서 처리하고 MQTT Outbound channel 내보내는 flow이다.

@Bean
fun mqttInbound(): IntegrationFlow {
    return IntegrationFlows.from(mqttChannelAdapter())
        .handle<String> { payload, headers ->
            // biz logic here
            // myMessageHandler is injected MyMessageHandler type Bean
            myMessageHandler.handleMessage(payload)
        }
        .channel(mqttOutboundChannel())
        .get()
}

5) Business logic

들어온 문자열을 뒤집어서 반환하는 간단한 business logic이 있다고 가정한다.

@Service
class MyMessageHandler {
    fun handleMessage(payload: String): String {
        return payload.reversed()
    }
}

6) MQTT outbound IntegrationFlow

MQTT Outbound channel로 들어온 메시지를 MQTT Message Handler를 통해 내보내는 integration flow. 이때 동적으로 publish할 topic을 바꿀 수 있는 기능을 추가했다. 여기에서는 testreply_ 라는 접두사 뒤에 비지니스 로직(문자열 뒤집기)을 통과한 결과의 첫번째 글자를 붙인 것을 메시지가 나갈 토픽으로 정하도록 했다.

(추상화 관점에서 봤을때 비지니스 로직이 메시지를 내보낼 MQTT topic까지 결정하는 것은 좋은 디자인이 아니라고 생각한다.)

@Bean
fun mqttOutbound(): IntegrationFlow = IntegrationFlows.from(mqttOutboundChannel())
    .enrichHeaders { h ->
        h.headerFunction<String>(MqttHeaders.TOPIC) { msg ->
            "testreply_${msg.payload[0]}"
        } 
    }
    .handle(mqttOutboundHandler(), { c -> c.id("mqttOutEndpoint") }) // (a)
    .get()

이 예제에서 테스트해보고 싶은 것은 MQTT topic을 구독하는 inflow가 메시지를 받으면 정해진 비지니스 로직을 수행 후 제대로 output을 내는지 여부이다. 이 예제 flow의 output은 MQTT topic으로의 publish되는 메시지이다.

이를 테스트를 하려면 1) MQTT와 연결되어 topic을 subscribe하고 topic에 payload가 들어오면 그에 따른 메시지를 생성해내서 inflow에 흘려주는 부분, 2) 실제 비지니스 로직,  3) 처리된 메시지를 받아서 MQTT topic에 publish하려는 부분을 테스트 해야한다.

MQTT Inflow 부분 테스트 하기

이 예제에서는 mqttChannelAdapter (MqttPahoMessageDrivenChannelAdapter 타입)가  MQTT topic을 subscribe하여 payload가 들어오면 그것을 o.s.messaging.Message 로 변환하여 내보내는 역할을 한다.

자동화 테스트에서는 MQTT접속을 원치 않으므로, 테스트코드에서는 이 channel adapter가 메시지를 내보낼 때 쓰는 output channel에 직접 메시지를 보내서 테스트한다.

MQTT Outbound 부분 테스트하기

마찬가지로 MqttPahoMessageHandlero.s.messaging.MessageHandler 인터페이스를 구현하며, 들어온 message를 MQTT topic으로 publish하는 역할을 한다.

테스트코드에서는 해당 handler가 MQTT에 보낼 메시지를 제대로 받았는지 테스트를 해보기 위해 message handler를 mocking하고 argument를 검사한다. 이를 위해 MockIntegrationContextsubstituteMessageHandlerFor 함수를 활용한다.

이 함수를 사용할때 주의할 점은 인자로 MessageHandler를 넘기는 것이 아니라 IntegrationFlow상에서 handler를 사용하도록 정의된 endpoint의 이름을 넘겨야한다는 것이다. [출처]  Endpoint의 이름은 .handle 함수의 두번째 인자로 지정이 가능하다. (위 mqttOutbound Bean 정의에서 (a) 부분 참고)

테스트 클래스 구성

@SpringIntegrationTest 어노테이션을 활용하여 test class가 Spring integration을 기반으로 하는 test임을 명시한다. 여기에 제공되는 noAutoStartup 옵션을 사용하면 integration flow들이 자동으로 시작되면서 MQTT 접속을 맺지 않게끔 설정할 수 있다.

@ExtendWith(SpringExtension::class)
@SpringIntegrationTest(noAutoStartup = ["mqttChannelAdapter"])
@Import(MqttIntegrationConfig::class)
@ContextConfiguration
class MyIntegrationTest {
    ...
}

@Import을 이용하여 불러올 configuration을 따로 설정해주었는데 이 예제에서는 실제로 MQTT에 접속하지 않는것도 확인해보기 위해 존재하지 않는 MQTT host를 설정에 넣어보았다.

@TestConfiguration
@EnableIntegration
class FakeMqttBrokerAddressConfig {
    @Bean
    fun mqttConfig(): MqttConfig {
        return MqttConfig(server = "tcp://non.existing.host:1883", clientId = UUID.randomUUID().toString())
    }
}

실제 테스트를 할때는 MQTT inbound를 위한 channel adapter가 보내는 output channel에 바로 메시지를 보낼것이다. 즉, MQTT 에 직접 접속하지 않고 MQTT 프로토콜로 메시지가 제대로 들어왔음을 가정하고 그 뒤 플로우를 테스트 한다.

또한, 비지니스 로직을 처리 후 MQTT outbound를 위한 message handler가 MQTT에 실제로 접속하지 않기 하기 위해 mockIntegrationContext를 이용하여 해당 handler를 mocking handler로 바꾸어 실제 MQTT로 보내는 handler의 동작을 막는다.

테스트를 위한 구성을 그림으로 표현하면 다음과 같다.

1) channel adapter의 output channel에 직접 메시지를 보냈을때 business logic가 호출되는지 검증한다.

val payload = "hello jayhan, this is mqtt integration test"
mqttChannelAdapter.outputChannel?.send(GenericMessage(payload))
verify {
    myMessageHandler.handleMessage(eq(payload))
}

2) MQTT에 메시지를 보낼때 쓰는 messageHandler의 input channel인 mqttOutboundChannel에 메시지가 잘 출력되는지 확인한다.

이를 위해 mqttOutboundChannel에 들어가있는 메시지를 확인하기 위해 queue channel에 메시지를 집어넣는 wire tap interactor를 이용한다.

val qchann = QueueChannel()
mqttOutboundChannel.addInterceptor(WireTap(qchann))

MQTT에 실제로 메시지를 보내는 handler의 동작을 막기 위해 mocking handler와 입력 메시지 검사를 위한 ArgumentCaptor를 설정한다.

val messageArgumentCaptor = MockIntegration.messageArgumentCaptor()
val mockHandler = MockIntegration.mockMessageHandler(messageArgumentCaptor)
mockIntegrationContext.substituteMessageHandlerFor(
    "mqttOutEndpoint", mockHandler.handleNext { }
)

outbound channel로 나간 메시지의 내용이 원래 의도한 것과 맞는지 확인한다.

val replyMessage = qchann.receive()
assertNotNull(replyMessage)
val replyPayload = replyMessage?.payload as String
assertEquals(myMessageHandler.handleMessage(payload), replyPayload)

MQTT outbound handler에 들어가는 메시지는 outbound IntegrationFlow에서 header를 결정하는 로직을 지나온 메시지이고, 이는 mocking된 message handler의 input을 검사하여 체크한다.

// check handler's input
val handlerInput = messageArgumentCaptor.value
assertEquals("testreply_${replyPayload[0]}", handlerInput.headers[MqttHeaders.TOPIC])

결론

이 글에서는 SpringIntegrationTest 를 이용하여 실제 integration adapter들을 기동하지 않고도 integration 및 비지니스 로직을 테스트하는 방법을 살펴봤다.

코드는 Github에서 다운로드 받을 수 있다.