rocketmq事务消息(rocketmq 发送失败一般怎么处理)
本文目录
rocketmq 发送失败一般怎么处理
一:RocketMQ简介RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:1.能够保证严格的消息顺序2.提供丰富的消息拉取模式3.高效的订阅者水平扩展能力4.实时的消息订阅机制5.亿级消息堆积能力二:安装RocketMQ下载源码首先我们从githup上获取RocketMQ的源码,目前最新的版本为3.5.8,下载地址为: 或者 wget /alibaba/RocketMQ/archive/v3.5.8.tar.gz。请注意:此时我们下载的是源码,直接解压时不能用的,所以我们需要编译之后才能使用。编译源码在进行编译源码之前我们需要安装JDK。如果你已经安装过了,请跳过这里。如果你还没有安装过JDK,请参考这篇文章(Linux环境下安装JDK)。然后我们还需要安装一下Maven。Maven的安装还是比较简单,只需要去官方上下载的安装吧,然后直接解压,再配置一下环境变量就OK。接下来我们把刚才下载来的RockeMQ的源码解压到/usr/local/rockemq-source文件夹中。在源码中有一个Install.sh。如图所示:。运行sh install.sh。在编译完成之后,我们只要target目录下的alibaba-rocketmq这个文件夹中内容,把alibaba-rocketmq文件夹中的内容移动到/usr/local/rocketmq中。如果你不想编译的话,可以从这里下载编译之后的rocketmq。(rocketmq3.5.8)。配置环境变量接下来我们需要配置一下环境变量。在终端中输入以下命令:vi /etc/profile ,在文件的末尾中添加如下两句话:export rocketmq=/usr/local/rocketmq export PATH=$PATH:$rocketmq/bin。接下来我们使配置的换将变量生效:source /etc/profile.三:启动RocketMQ接下来我们启动一下刚才编译的RocketMQ.在启动之前我们需要修改一下RocketMQ启动的内存大小(如果你的系统内存比较大的话,请忽略)。我们进入到/usr/local/rocketmq/bin中,在终端中输入以下命令修改mqnamesrv的内存大小:vi runserver.sh.修改为如图的内容:,接下来修改broker的内存大小:vi runbroker.sh:启动mqnameserver进入到/usr/local/rocketmq/bin中输入以下命令:nohup sh mqnamesrv 》 ~/logs/rocketmqlogs/namesrv.log 2》&1 &。注意最后的这个 & 不要少。启动mqbroker进入到/usr/local/rocketmq/bin中输入以下命令:nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true 》 ~/logs/rocketmqlogs/broker.log 2》&1 &。注意:localhost可以换成你刚才启动mqnamesrv的IP。autoCreateTopicEnable=true 这句话不要少了。最后的 & 也不要少了。我们可以通过 ps aux | grep java命令来查看启动的情况。到此,rocketmq的安装完毕。四:RocketMQ的小例子producer: view plain copypackage com.zkn.newlearn.rocketmq; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.remoting.exception.RemotingException; import java.util.concurrent.TimeUnit; /** * Created by zkn on 2016/10/27. */ public class ProducerTest01 { public static void main(String args) { /** * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例《br》 * 注意:ProducerGroupName需要由应用来保证唯一《br》 * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键, * 因为服务器会回查这个Group下的任意一个Producer */ DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName“); //producer.setNamesrvAddr(“192.168.180.1:9876“); producer.setNamesrvAddr(“192.168.180.133:9876“); producer.setInstanceName(“Producer“); /** * Producer对象在使用之前必须要调用start初始化,初始化一次即可《br》 * 注意:切记不可以在每次发送消息时,都调用start方法 */ try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); } for (int i = 0; i 《 100; i++) { try { /** * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。 * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,《br》 * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,《br》 * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。 */ { Message msg = new Message(“TopicTest1“,// topic “TagA“,// tag “OrderID001“,// key (“Hello MetaQ“).getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message(“TopicTest2“, “TagB“, “OrderID001“, (“Hello MetaQ TagB“.getBytes())); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message(“TopicTest3“, “TagC“, “OrderID001“, (“Hello MetaQ TagC“).getBytes()); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } TimeUnit.MILLISECONDS.sleep(1000); } catch (MQClientException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } } /** * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己 * 注意:我们建议应用在JBOSS、Tomcat等容器的退出销毁方法里调用shutdown方法 */ producer.shutdown(); } }
怎么指定rocketmq的jdk
一:RocketMQ简介RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:1.能够保证严格的消息顺序2.提供丰富的消息拉取模式3.高效的订阅者水平扩展能力4.实时的消息订阅机制5.亿级消息堆积能力二:安装RocketMQ下载源码首先我们从githup上获取RocketMQ的源码,目前最新的版本为3.5.8,下载地址为:或者wget/alibaba/RocketMQ/archive/v3.5.8.tar.gz。请注意:此时我们下载的是源码,直接解压时不能用的,所以我们需要编译之后才能使用。编译源码在进行编译源码之前我们需要安装JDK。如果你已经安装过了,请跳过这里。如果你还没有安装过JDK,请参考这篇文章(Linux环境下安装JDK)。然后我们还需要安装一下Maven。Maven的安装还是比较简单,只需要去官方上下载的安装吧,然后直接解压,再配置一下环境变量就OK。接下来我们把刚才下载来的RockeMQ的源码解压到/usr/local/rockemq-source文件夹中。在源码中有一个Install.sh。如图所示:。运行shinstall.sh。在编译完成之后,我们只要target目录下的alibaba-rocketmq这个文件夹中内容,把alibaba-rocketmq文件夹中的内容移动到/usr/local/rocketmq中。如果你不想编译的话,可以从这里下载编译之后的rocketmq。(rocketmq3.5.8)。配置环境变量接下来我们需要配置一下环境变量。在终端中输入以下命令:vi/etc/profile,在文件的末尾中添加如下两句话:exportrocketmq=/usr/local/rocketmqexportPATH=$PATH:$rocketmq/bin。接下来我们使配置的换将变量生效:source/etc/profile.三:启动RocketMQ接下来我们启动一下刚才编译的RocketMQ.在启动之前我们需要修改一下RocketMQ启动的内存大小(如果你的系统内存比较大的话,请忽略)。我们进入到/usr/local/rocketmq/bin中,在终端中输入以下命令修改mqnamesrv的内存大小:virunserver.sh.修改为如图的内容:,接下来修改broker的内存大小:virunbroker.sh:启动mqnameserver进入到/usr/local/rocketmq/bin中输入以下命令:nohupshmqnamesrv》~/logs/rocketmqlogs/namesrv.log2》&1&。注意最后的这个&不要少。启动mqbroker进入到/usr/local/rocketmq/bin中输入以下命令:nohupshmqbroker-nlocalhost:9876autoCreateTopicEnable=true》~/logs/rocketmqlogs/broker.log2》&1&。注意:localhost可以换成你刚才启动mqnamesrv的IP。autoCreateTopicEnable=true这句话不要少了。最后的&也不要少了。我们可以通过psaux|grepjava命令来查看启动的情况。到此,rocketmq的安装完毕。四:RocketMQ的小例子producer:viewplaincopypackagecom.zkn.newlearn.rocketmq;importcom.alibaba.rocketmq.client.exception.MQBrokerException;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.producer.DefaultMQProducer;importcom.alibaba.rocketmq.client.producer.SendResult;importcom.alibaba.rocketmq.common.message.Message;importcom.alibaba.rocketmq.remoting.exception.RemotingException;importjava.util.concurrent.TimeUnit;/***Createdbyzknon2016/10/27.*/publicclassProducerTest01{publicstaticvoidmain(Stringargs){/***一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例*注意:ProducerGroupName需要由应用来保证唯一*ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,*因为服务器会回查这个Group下的任意一个Producer*/DefaultMQProducerproducer=newDefaultMQProducer(“ProducerGroupName“);//producer.setNamesrvAddr(“192.168.180.1:9876“);producer.setNamesrvAddr(“192.168.180.133:9876“);producer.setInstanceName(“Producer“);/***Producer对象在使用之前必须要调用start初始化,初始化一次即可*注意:切记不可以在每次发送消息时,都调用start方法*/try{producer.start();}catch(MQClientExceptione){e.printStackTrace();}for(inti=0;i《100;i++){try{/***下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。*注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,*例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,*需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。*/{Messagemsg=newMessage(“TopicTest1“,//topic“TagA“,//tag“OrderID001“,//key(“HelloMetaQ“).getBytes());//bodySendResultsendResult=producer.send(msg);System.out.println(sendResult);}{Messagemsg=newMessage(“TopicTest2“,“TagB“,“OrderID001“,(“HelloMetaQTagB“.getBytes()));SendResultsendResult=producer.send(msg);System.out.println(sendResult);}{Messagemsg=newMessage(“TopicTest3“,“TagC“,“OrderID001“,(“HelloMetaQTagC“).getBytes());SendResultsendResult=producer.send(msg);System.out.println(sendResult);}TimeUnit.MILLISECONDS.sleep(1000);}catch(MQClientExceptione){e.printStackTrace();}catch(InterruptedExceptione){e.printStackTrace();}catch(RemotingExceptione){e.printStackTrace();}catch(MQBrokerExceptione){e.printStackTrace();}}/***应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己*注意:我们建议应用在JBOSS、Tomcat等容器的退出销毁方法里调用shutdown方法*/producer.shutdown();}}
如何查rocketmq 每日量
通过rocketmq控制台查看。除了控制台查看,还可以通过主机地址进行查看,具体步骤如下:1、首先登陆nameserver主机地址。2、执行以下命令(shmqadminconsumerProgress-gconsumeGroupNameeg:shmqadminconsumerProgress-gtest)查看。3、出来的结果BrokerOffset为生产的条数,ConsumerOffset为消费的条数,Diff为堆积的条数。RocketMQ是阿里巴巴开源的分布式消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。
更多文章:

md5解密原理(关于md5的原理!哪位大哥说一下,不要从网上随便拖一段文字过来哦!)
2025年2月9日 16:10

正版linux系统官网(中国官方现在正力推LINUX操作系统以取代WINDAOWS桌面系统,是真的吗如何看)
2025年3月18日 15:40

william 的昵称是不是 bill/willy?关于“willy“这个词的一些疑问
2025年3月1日 11:20

java有新地址吗(成都java培训-盛课盟即将乔迁新地址)
2025年3月16日 04:10

xposed官网下载(Vivo+V2049A+安装xposed框架)
2025年3月17日 05:40

jsp编译后是什么文件(jsp页面经过转译后将创建一个什么文件)
2025年3月29日 02:20

correspondence怎么读音(Correspondence怎么读)
2025年3月16日 18:00