序
MQTT(Message Queuing Telemetry Transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限的IoT场景。这里简单介绍一下如何在springboot中集成。
maven
1
2
3
4
5
6
7
8
9
10
11
12
|
< dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-integration</ artifactId > </ dependency > < dependency > < groupId >org.springframework.integration</ groupId > < artifactId >spring-integration-stream</ artifactId > </ dependency > < dependency > < groupId >org.springframework.integration</ groupId > </ dependency > |
配置client factory
1
2
3
4
5
6
7
8
|
@Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setServerURIs( "tcp://demo:1883" ); // factory.setUserName("guest"); // factory.setPassword("guest"); return factory; } |
配置consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@Bean public IntegrationFlow mqttInFlow() { return IntegrationFlows.from(mqttInbound()) .transform(p -> p + ", received from MQTT" ) .handle(logger()) .get(); } private LoggingHandler logger() { LoggingHandler loggingHandler = new LoggingHandler( "INFO" ); loggingHandler.setLoggerName( "siSample" ); return loggingHandler; } @Bean public MessageProducerSupport mqttInbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( "siSampleConsumer" , mqttClientFactory(), "siSampleTopic" ); adapter.setCompletionTimeout( 5000 ); adapter.setConverter( new DefaultPahoMessageConverter()); adapter.setQos( 1 ); return adapter; } |
配置producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
@Bean public IntegrationFlow mqttOutFlow() { //console input // return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(), // e -> e.poller(Pollers.fixedDelay(1000))) // .transform(p -> p + " sent to MQTT") // .handle(mqttOutbound()) // .get(); return IntegrationFlows.from(outChannel()) .handle(mqttOutbound()) .get(); } @Bean public MessageChannel outChannel() { return new DirectChannel(); } @Bean public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( "siSamplePublisher" , mqttClientFactory()); messageHandler.setAsync( true ); messageHandler.setDefaultTopic( "siSampleTopic" ); return messageHandler; } |
配置MessagingGateway
1
2
3
4
|
@MessagingGateway (defaultRequestChannel = "outChannel" ) public interface MsgWriter { void write(String note); } |
这样就大功告成了
doc
spring-integration-samples-mqtt
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://segmentfault.com/a/1190000010601548