如何在Java 环境下使用 HTTP 协议收发 MQ 消息

java

1. 准备环境
在工程 POM 文件添加 HTTP Java 客户端的依赖。

     <dependency>

<groupId>org.eclipse.jetty</groupId>

<artifactId>jetty-client</artifactId>

<version>9.3.4.RC1</version>

</dependency>

<dependency>

<groupId>com.aliyun.openservices</groupId>

<artifactId>ons-client</artifactId>

<version>1.1.11</version>

</dependency>

2. 运行代码配置(user.properties)
您需要设置配置文件(user.properties)的相关内容,具体请参考申请 MQ 资源 。

    #您在控制台创建的Topic

Topic=xxx

#公测url

URL=http://publictest-rest.ons.aliyun.com

#阿里云身份验证码

Ak=xxx

#阿里云身份验证密钥

Sk=xxx

#MQ控制台创建的Producer ID

ProducerID=xxx

#MQ控制台创建的Consumer ID

ConsumerID=xxx

说明:URL 中的 Key,Tag以及 POST Content-Type 没有任何的限制,只要确保Key 和 Tag 相同唯一即可,可以放在 user.properties 里面。
3. HTTP 发送消息示例代码
您可以按以下说明设置相应参数并测试 HTTP 消息发送功能。

    package com.aliyun.openservice.ons.http.demo;

import java.nio.charset.Charset;

import java.util.Date;

import java.util.Properties;

import org.eclipse.jetty.client.HttpClient;

import org.eclipse.jetty.client.api.ContentProvider;

import org.eclipse.jetty.client.api.ContentResponse;

import org.eclipse.jetty.client.api.Request;

import org.eclipse.jetty.client.util.StringContentProvider;

import com.aliyun.openservices.ons.api.impl.authority.AuthUtil;

public class HttpProducer {

public static String SIGNATURE="Signature";

public static String NUM="num";

public static String CONSUMERID="ConsumerID";

public static String PRODUCERID="ProducerID";

public static String TIMEOUT="timeout";

public static String TOPIC="Topic";

public static String AK="AccessKey";

public static String BODY="body";

public static String MSGHANDLE="msgHandle";

public static String TIME="time";

public static void main(String[] args) throws Exception {

HttpClient httpClient=new HttpClient();

httpClient.setMaxConnectionsPerDestination(1);

httpClient.start();

Properties properties=new Properties();

properties.load(HttpProducer.class.getClassLoader().getResourceAsStream("user.properties"));

String topic=properties.getProperty("Topic"); //请在user.properties配置您的Topic

String url=properties.getProperty("URL");//公测集群配置为http://publictest-rest.ons.aliyun.com/

String ak=properties.getProperty("Ak");//请在user.properties配置您的Ak

String sk=properties.getProperty("Sk");//请在user.properties配置您的Sk

String pid=properties.getProperty("ProducerID");//请在user.properties配置您的Producer ID

String date=String.valueOf(new Date().getTime());

String sign=null;

String body="hello ons http";

String NEWLINE="\n";

String signString;

for (int i = 0; i < 10; i++) {

date=String.valueOf(new Date().getTime());

Request req=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&tag=http"+"&key=http");

ContentProvider content=new StringContentProvider(body);

req.content(content);

signString=topic+NEWLINE+pid+NEWLINE+MD5.getInstance().getMD5String(body)+NEWLINE+date;

System.out.println(signString);

sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);

req.header(SIGNATURE, sign);

req.header(AK, ak);

req.header(PRODUCERID, pid);

ContentResponse response;

response=req.send();

System.out.println("send msg:"+response.getStatus()+response.getContentAsString());

}

}

}

4. HTTP接收消息示例代码
请按以下说明设置相应参数并测试 HTTP 消息接收功能。

    package com.aliyun.openservice.ons.http.demo;

import java.nio.charset.Charset;

import java.util.Date;

import java.util.List;

import java.util.Properties;

import org.eclipse.jetty.client.HttpClient;

import org.eclipse.jetty.client.api.ContentProvider;

import org.eclipse.jetty.client.api.ContentResponse;

import org.eclipse.jetty.client.api.Request;

import org.eclipse.jetty.client.util.StringContentProvider;

import org.eclipse.jetty.http.HttpMethod;

import com.alibaba.fastjson.JSON;

import com.aliyun.openservice.ons.mqtt.demo.MqttProducer;

import com.aliyun.openservices.ons.api.impl.authority.AuthUtil;

public class HttpConsumer {

public static String SIGNATURE="Signature";

public static String NUM="num";

public static String CONSUMERID="ConsumerID";

public static String PRODUCERID="ProducerID";

public static String TIMEOUT="timeout";

public static String TOPIC="Topic";

public static String AK="AccessKey";

public static String BODY="body";

public static String MSGHANDLE="msgHandle";

public static String TIME="time";

public static void main(String[] args) throws Exception {

HttpClient httpClient=new HttpClient();

httpClient.setMaxConnectionsPerDestination(1);

httpClient.start();

Properties properties=new Properties();

properties.load(HttpConsumer.class.getClassLoader().getResourceAsStream("user.properties"));

String topic=properties.getProperty("Topic"); //请在user.properties配置您的topic

String url=properties.getProperty("URL");//公测集群配置为http://publictest-rest.ons.aliyun.com/

String ak=properties.getProperty("Ak");//请在user.properties配置您的Ak

String sk=properties.getProperty("Sk");//请在user.properties配置您的Sk

String cid=properties.getProperty("ConsumerID");//请在user.properties配置您的Consumer ID

String date=String.valueOf(new Date().getTime());

String sign=null;

String NEWLINE="\n";

String signString;

System.out.println(NEWLINE+NEWLINE);

while (true) {

try {

date=String.valueOf(new Date().getTime());

Request req=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&num="+32);

req.method(HttpMethod.GET);

ContentResponse response;

signString=topic+NEWLINE+cid+NEWLINE+date;

sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);

req.header(SIGNATURE, sign);

req.header(AK, ak);

req.header(CONSUMERID, cid);

long start=System.currentTimeMillis();

response=req.send();

System.out.println("get cost:"+(System.currentTimeMillis()-start)/1000

+" "+response.getStatus()+" "+response.getContentAsString());

List<SimpleMessage> list = null;

if (response.getContentAsString()!=null&&!response.getContentAsString().isEmpty()) {

list=JSON.parseArray(response.getContentAsString(), SimpleMessage.class);

}

if (list==null||list.size()==0) {

Thread.sleep(100);

continue;

}

System.out.println("size is :"+list.size());

for (SimpleMessage simpleMessage : list) {

date=String.valueOf(new Date().getTime());

System.out.println("receive msg:"+simpleMessage.getBody()+" born time "+simpleMessage.getBornTime());

req=httpClient.POST(url+"message/?msgHandle="+simpleMessage.getMsgHandle()+"&topic="+topic+"&time="+date);

req.method(HttpMethod.DELETE);

signString=topic+NEWLINE+cid+NEWLINE+simpleMessage.getMsgHandle()+NEWLINE+date;

sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);

req.header(SIGNATURE, sign);

req.header(AK, ak);

req.header(CONSUMERID, cid);

response=req.send();

System.out.println("delete msg:"+response.toString());

}

Thread.sleep(100);

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

5. HTTP示例程序工具类
(1)消息封装类: SimpleMessage.java

    package com.aliyun.openservice.ons.http.demo;

public class SimpleMessage {

private String body;

private String msgId;

private String bornTime;

private String msgHandle;

private int reconsumeTimes;

private String tag;

public void setTag(String tag) {

this.tag = tag;

}

public String getTag() {

return tag;

}

public int getReconsumeTimes() {

return reconsumeTimes;

}

public void setReconsumeTimes(int reconsumeTimes) {

this.reconsumeTimes = reconsumeTimes;

}

public void setMsgHandle(String msgHandle) {

this.msgHandle = msgHandle;

}

public String getMsgHandle() {

return msgHandle;

}

public String getBody() {

return body;

}

public void setBody(String body) {

this.body = body;

}

public String getMsgId() {

return msgId;

}

public void setMsgId(String msgId) {

this.msgId = msgId;

}

public String getBornTime() {

return bornTime;

}

public void setBornTime(String bornTime) {

this.bornTime = bornTime;

}

}

(2)字符串签名类: MD5.java

    package com.aliyun.openservice.ons.http.demo;

import java.io.UnsupportedEncodingException;

import java.nio.charset.Charset;

import java.security.MessageDigest;

import java.sql.SQLException;

import java.util.Date;

import java.util.HashMap;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.LoggerFactory;

public class MD5 {

private static final org.slf4j.Logger log = LoggerFactory.getLogger(MD5.class);

private static char[] digits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };

private static Map<Character, Integer> rDigits = new HashMap<Character, Integer>(16);

static {

for (int i = 0; i < digits.length; ++i) {

rDigits.put(digits[i], i);

}

}

private static MD5 me = new MD5();

private MessageDigest mHasher;

private final ReentrantLock opLock = new ReentrantLock();

private MD5() {

try {

this.mHasher = MessageDigest.getInstance("md5");

} catch (Exception e) {

throw new RuntimeException(e);

}

}

public static MD5 getInstance() {

return me;

}

public String getMD5String(String content) {

return this.bytes2string(this.hash(content));

}

public String getMD5String(byte[] content) {

return this.bytes2string(this.hash(content));

}

public byte[] getMD5Bytes(byte[] content) {

return this.hash(content);

}

public byte[] hash(String str) {

this.opLock.lock();

try {

byte[] bt = this.mHasher.digest(str.getBytes("utf-8"));

if (null == bt || bt.length != 16) {

throw new IllegalArgumentException("md5 need");

}

return bt;

} catch (UnsupportedEncodingException e) {

throw new RuntimeException("unsupported utf-8 encoding", e);

} finally {

this.opLock.unlock();

}

}

public byte[] hash(byte[] data) {

this.opLock.lock();

try {

byte[] bt = this.mHasher.digest(data);

if (null == bt || bt.length != 16) {

throw new IllegalArgumentException("md5 need");

}

return bt;

} finally {

this.opLock.unlock();

}

}

public String bytes2string(byte[] bt) {

int l = bt.length;

char[] out = new char[l << 1];

for (int i = 0, j = 0; i < l; i++) {

out[j++] = digits[(0xF0 & bt[i]) >>> 4];

out[j++] = digits[0x0F & bt[i]];

}

if (log.isDebugEnabled()) {

log.debug("[hash]" + new String(out));

}

return new String(out);

}

public byte[] string2bytes(String str) {

if (null == str) {

throw new NullPointerException("Argument is not allowed empty");

}

if (str.length() != 32) {

throw new IllegalArgumentException("String length must equals 32");

}

byte[] data = new byte[16];

char[] chs = str.toCharArray();

for (int i = 0; i < 16; ++i) {

int h = rDigits.get(chs[i * 2]).intValue();

int l = rDigits.get(chs[i * 2 + 1]).intValue();

data[i] = (byte) ((h & 0x0F) << 4 | l & 0x0F);

}

return data;

}

}

以上是 如何在Java 环境下使用 HTTP 协议收发 MQ 消息 的全部内容, 来源链接: utcz.com/z/393786.html

回到顶部