1. 准备环境
在工程 POM 文件添加 HTTP Java 客户端的依赖。
1
2
3
4
5
6
7
8
9
10
|
<dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-client</artifactId> <version> 9.3 . 4 .RC1</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version> 1.1 . 11 </version> </dependency> |
2. 运行代码配置(user.properties)
您需要设置配置文件(user.properties)的相关内容,具体请参考申请 MQ 资源 。
1
2
3
4
5
6
7
8
9
10
11
12
|
#您在控制台创建的Topic Topic=xxx #公测url URL=http: //publictest-rest.ons.aliyun.com #阿里云身份验证码 Ak=xxx #阿里云身份验证密钥 Sk=xxx #MQ控制台创建的Producer ID ProducerID=xxx #MQ控制台创建的Consumer ID ConsumerID=xxx |
说明:URL 中的 Key,Tag以及 POST Content-Type 没有任何的限制,只要确保Key 和 Tag 相同唯一即可,可以放在 user.properties 里面。
3. HTTP 发送消息示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
您可以按以下说明设置相应参数并测试 HTTP 消息发送功能。 package com.aliyun.openservice.ons.http.demo; import java.nio.charset.Charset; import java.util.Date; import java.util.Properties; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.util.StringContentProvider; import com.aliyun.openservices.ons.api.impl.authority.AuthUtil; public class HttpProducer { public static String SIGNATURE= "Signature" ; public static String NUM= "num" ; public static String CONSUMERID= "ConsumerID" ; public static String PRODUCERID= "ProducerID" ; public static String TIMEOUT= "timeout" ; public static String TOPIC= "Topic" ; public static String AK= "AccessKey" ; public static String BODY= "body" ; public static String MSGHANDLE= "msgHandle" ; public static String TIME= "time" ; public static void main(String[] args) throws Exception { HttpClient httpClient= new HttpClient(); httpClient.setMaxConnectionsPerDestination( 1 ); httpClient.start(); Properties properties= new Properties(); properties.load(HttpProducer. class .getClassLoader().getResourceAsStream( "user.properties" )); String topic=properties.getProperty( "Topic" ); //请在user.properties配置您的Topic String url=properties.getProperty( "URL" ); //公测集群配置为http://publictest-rest.ons.aliyun.com/ String ak=properties.getProperty( "Ak" ); //请在user.properties配置您的Ak String sk=properties.getProperty( "Sk" ); //请在user.properties配置您的Sk String pid=properties.getProperty( "ProducerID" ); //请在user.properties配置您的Producer ID String date=String.valueOf( new Date().getTime()); String sign= null ; String body= "hello ons http" ; String NEWLINE= "\n" ; String signString; for ( int i = 0 ; i < 10 ; i++) { date=String.valueOf( new Date().getTime()); Request req=httpClient.POST(url+ "message/?topic=" +topic+ "&time=" +date+ "&tag=http" + "&key=http" ); ContentProvider content= new StringContentProvider(body); req.content(content); signString=topic+NEWLINE+pid+NEWLINE+MD5.getInstance().getMD5String(body)+NEWLINE+date; System.out.println(signString); sign=AuthUtil.calSignature(signString.getBytes(Charset.forName( "UTF-8" )), sk); req.header(SIGNATURE, sign); req.header(AK, ak); req.header(PRODUCERID, pid); ContentResponse response; response=req.send(); System.out.println( "send msg:" +response.getStatus()+response.getContentAsString()); } } }<br> |
4. HTTP接收消息示例代码
请按以下说明设置相应参数并测试 HTTP 消息接收功能。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
package com.aliyun.openservice.ons.http.demo; import java.nio.charset.Charset; import java.util.Date; import java.util.List; import java.util.Properties; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.util.StringContentProvider; import org.eclipse.jetty.http.HttpMethod; import com.alibaba.fastjson.JSON; import com.aliyun.openservice.ons.mqtt.demo.MqttProducer; import com.aliyun.openservices.ons.api.impl.authority.AuthUtil; public class HttpConsumer { public static String SIGNATURE= "Signature" ; public static String NUM= "num" ; public static String CONSUMERID= "ConsumerID" ; public static String PRODUCERID= "ProducerID" ; public static String TIMEOUT= "timeout" ; public static String TOPIC= "Topic" ; public static String AK= "AccessKey" ; public static String BODY= "body" ; public static String MSGHANDLE= "msgHandle" ; public static String TIME= "time" ; public static void main(String[] args) throws Exception { HttpClient httpClient= new HttpClient(); httpClient.setMaxConnectionsPerDestination( 1 ); httpClient.start(); Properties properties= new Properties(); properties.load(HttpConsumer. class .getClassLoader().getResourceAsStream( "user.properties" )); String topic=properties.getProperty( "Topic" ); //请在user.properties配置您的topic String url=properties.getProperty( "URL" ); //公测集群配置为http://publictest-rest.ons.aliyun.com/ String ak=properties.getProperty( "Ak" ); //请在user.properties配置您的Ak String sk=properties.getProperty( "Sk" ); //请在user.properties配置您的Sk String cid=properties.getProperty( "ConsumerID" ); //请在user.properties配置您的Consumer ID String date=String.valueOf( new Date().getTime()); String sign= null ; String NEWLINE= "\n" ; String signString; System.out.println(NEWLINE+NEWLINE); while ( true ) { try { date=String.valueOf( new Date().getTime()); Request req=httpClient.POST(url+ "message/?topic=" +topic+ "&time=" +date+ "&num=" + 32 ); req.method(HttpMethod.GET); ContentResponse response; signString=topic+NEWLINE+cid+NEWLINE+date; sign=AuthUtil.calSignature(signString.getBytes(Charset.forName( "UTF-8" )), sk); req.header(SIGNATURE, sign); req.header(AK, ak); req.header(CONSUMERID, cid); long start=System.currentTimeMillis(); response=req.send(); System.out.println( "get cost:" +(System.currentTimeMillis()-start)/ 1000 + " " +response.getStatus()+ " " +response.getContentAsString()); List<SimpleMessage> list = null ; if (response.getContentAsString()!= null &&!response.getContentAsString().isEmpty()) { list=JSON.parseArray(response.getContentAsString(), SimpleMessage. class ); } if (list== null ||list.size()== 0 ) { Thread.sleep( 100 ); continue ; } System.out.println( "size is :" +list.size()); for (SimpleMessage simpleMessage : list) { date=String.valueOf( new Date().getTime()); System.out.println( "receive msg:" +simpleMessage.getBody()+ " born time " +simpleMessage.getBornTime()); req=httpClient.POST(url+ "message/?msgHandle=" +simpleMessage.getMsgHandle()+ "&topic=" +topic+ "&time=" +date); req.method(HttpMethod.DELETE); signString=topic+NEWLINE+cid+NEWLINE+simpleMessage.getMsgHandle()+NEWLINE+date; sign=AuthUtil.calSignature(signString.getBytes(Charset.forName( "UTF-8" )), sk); req.header(SIGNATURE, sign); req.header(AK, ak); req.header(CONSUMERID, cid); response=req.send(); System.out.println( "delete msg:" +response.toString()); } Thread.sleep( 100 ); } catch (Exception e) { e.printStackTrace(); } } } } |
5. HTTP示例程序工具类
(1)消息封装类: SimpleMessage.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
package com.aliyun.openservice.ons.http.demo; public class SimpleMessage { private String body; private String msgId; private String bornTime; private String msgHandle; private int reconsumeTimes; private String tag; public void setTag(String tag) { this .tag = tag; } public String getTag() { return tag; } public int getReconsumeTimes() { return reconsumeTimes; } public void setReconsumeTimes( int reconsumeTimes) { this .reconsumeTimes = reconsumeTimes; } public void setMsgHandle(String msgHandle) { this .msgHandle = msgHandle; } public String getMsgHandle() { return msgHandle; } public String getBody() { return body; } public void setBody(String body) { this .body = body; } public String getMsgId() { return msgId; } public void setMsgId(String msgId) { this .msgId = msgId; } public String getBornTime() { return bornTime; } public void setBornTime(String bornTime) { this .bornTime = bornTime; } } |
(2)字符串签名类: MD5.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
package com.aliyun.openservice.ons.http.demo; import java.io.UnsupportedEncodingException; import java.nio.charset.Charset; import java.security.MessageDigest; import java.sql.SQLException; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.LoggerFactory; public class MD5 { private static final org.slf4j.Logger log = LoggerFactory.getLogger(MD5. class ); private static char [] digits = { '0' , '1' , '2' , '3' , '4' , '5' , '6' , '7' , '8' , '9' , 'a' , 'b' , 'c' , 'd' , 'e' , 'f' }; private static Map<Character, Integer> rDigits = new HashMap<Character, Integer>( 16 ); static { for ( int i = 0 ; i < digits.length; ++i) { rDigits.put(digits[i], i); } } private static MD5 me = new MD5(); private MessageDigest mHasher; private final ReentrantLock opLock = new ReentrantLock(); private MD5() { try { this .mHasher = MessageDigest.getInstance( "md5" ); } catch (Exception e) { throw new RuntimeException(e); } } public static MD5 getInstance() { return me; } public String getMD5String(String content) { return this .bytes2string( this .hash(content)); } public String getMD5String( byte [] content) { return this .bytes2string( this .hash(content)); } public byte [] getMD5Bytes( byte [] content) { return this .hash(content); } public byte [] hash(String str) { this .opLock.lock(); try { byte [] bt = this .mHasher.digest(str.getBytes( "utf-8" )); if ( null == bt || bt.length != 16 ) { throw new IllegalArgumentException( "md5 need" ); } return bt; } catch (UnsupportedEncodingException e) { throw new RuntimeException( "unsupported utf-8 encoding" , e); } finally { this .opLock.unlock(); } } public byte [] hash( byte [] data) { this .opLock.lock(); try { byte [] bt = this .mHasher.digest(data); if ( null == bt || bt.length != 16 ) { throw new IllegalArgumentException( "md5 need" ); } return bt; } finally { this .opLock.unlock(); } } public String bytes2string( byte [] bt) { int l = bt.length; char [] out = new char [l << 1 ]; for ( int i = 0 , j = 0 ; i < l; i++) { out[j++] = digits[( 0xF0 & bt[i]) >>> 4 ]; out[j++] = digits[ 0x0F & bt[i]]; } if (log.isDebugEnabled()) { log.debug( "[hash]" + new String(out)); } return new String(out); } public byte [] string2bytes(String str) { if ( null == str) { throw new NullPointerException( "Argument is not allowed empty" ); } if (str.length() != 32 ) { throw new IllegalArgumentException( "String length must equals 32" ); } byte [] data = new byte [ 16 ]; char [] chs = str.toCharArray(); for ( int i = 0 ; i < 16 ; ++i) { int h = rDigits.get(chs[i * 2 ]).intValue(); int l = rDigits.get(chs[i * 2 + 1 ]).intValue(); data[i] = ( byte ) ((h & 0x0F ) << 4 | l & 0x0F ); } return data; } } |
希望本篇文章对您有所帮助
原文链接:http://www.cnblogs.com/wangyayun/p/6640048.html