澳门搏彩官方网 > Web前端 >

罗克etMQ入门手册

前言

继小编上生龙活虎篇博客后布满式音讯队列RocketMQ学习课程①上黄金时代篇博客最重大介绍了两种常用的MQ,所以本博客再简要介绍一下罗克etMQ的规律和省略的例子,基于Java实现,希望得以支持学习者

豆蔻梢头、应用处景及质量(在运用罗克etMq早先我们要想一想那框架能帮大家肃清什么实际主题素材)

一:RocketMQ简介

RoketMQ搭建Linux版

“工于利其事,磨刀不误砍柴工”,所以大家先是要求搭建好RocketMQ,思量到学习者不必然有Linux系统的服务器,所以本博客介绍一下Linux和Window系统的二种安装情势,以补充上黄金年代篇博客

因为Ali曾经将罗克etMQ捐给Apache了,所以以往大家必要去Apache官方网址下载罗克etMQ官方网址

在意罗克etMQ是基于Java开荒的,所以安装前必得安装JDK,Linux JDK安装的能够看布满式新闻队列罗克etMQ学习课程①下载文件解压后,能够见到conf文件夹里有2m-noslave、2m-2s-async、2m-2s-sync文件夹

2m-noslave 两主,无从的配置

2m-2s-async 两主,两从,同步复制数据的布局

2m-2s-sync 两主,两从,异步复制数据的配备

我们找到2m-noslave的broker-a.properties文件,改正完备配置broker-a.properties

#所属集群名字 brokerClusterName=DefaultCluster#broker名字,注意此处不同的配置文件填写的不一样brokerName=broker-a#0 表示 Master,>0 表示 SlavebrokerId=0#nameServer地址,分号分割namesrvAddr=127.0.0.1:9876#关键brokerIP1=127.0.0.1#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数defaultTopicQueueNums=4#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭autoCreateTopicEnable=true#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭autoCreateSubscriptionGroup=true#Broker 对外服务的监听端口listenPort=10911#删除文件时间点,默认凌晨 4点deleteWhen=04#文件保留时间,默认 48 小时fileReservedTime=48#commitLog每个文件的大小默认1GmapedFileSizeCommitLog=1073741824#ConsumeQueue每个文件默认存30W条,根据业务情况调整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#这里是我的 日志配置#存储路径storePathRootDir=/usr/local/rocketmq/store#commitLog 存储路径storePathCommitLog=/usr/local/rocketmq/store/commitlog#消费队列存储路径存储路径storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue#消息索引存储路径storePathIndex=/usr/local/rocketmq/store/index#checkpoint 文件存储路径storeCheckpoint=/usr/local/rocketmq/store/checkpoint#abort 文件存储路径abortFile=/usr/local/rocketmq/store/abort#限制的消息大小 maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVEbrokerRole=ASYNC_MASTER#刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH#checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉消息线程池数量#pullMessageThreadPoolNums=128

先介绍一下linux系统的貌似将压缩文件解压到/usr/local

cd /usr/localtar -xzf apache-rocketmq.tar.gzmv apache-rocketmq rocketmqmkdir /usr/rocketmq/logs

意况变量配置

vim /etc/profile

纠正如下配置

export JAVA_HOME=/usr/java/jdk1.8.0_102export ROCKETMQ_HOME=/usr/local/rocketmqexport PATH=$PATH:$JAVA_HOME/bin:/usr/local/src/redis-3.2.8/bin:$ROCKETMQ_HOME/bin:$PATHexport CLASSPATH=.:$JAVA_HOME/lib

启动mqnamesrv

cd /usr/local/rocketmq/binnohup sh /usr/local/rocketmq/bin/mqnamesrv >/usr/local/rocketmq/logs/mqnamesrv.log 2>&1 &

启动Broker

nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties > /usr/local/rocketmq/logs/mqbroker.log 2>&1 &

要设置自动创制Topic,必要充足autoCreateTopicEnable=true

关闭Broker服务sh mqshutdown broker

起步成功能够用jps查看

澳门官方赌场 1此间写图片描述

1、异步管理,将不是必得的业务逻辑,进行异步管理,例如注册之后短信、邮箱的出殡和下葬

罗克etMQ是后生可畏款分布式、队列模型的新闻中间件,具有以下特征:

RocketMQ搭建Window版

1、下载RocketMQ后,解压到D:alibaba-rocketmq

2、在D:alibaba-rocketmq,Ctrl+Shift,右键,展开dom分界面,输入如下命令行start /b bin/mqnamesrv.exe >D:alibaba-rocketmqlogsmqnamesrv.log查看nameserver是不是运营jps -v

3、启动Broker

start /b bin/mqbroker.exe -n "127.0.0.1:9876" autoCreateTopicEnable=true >D:alibaba-rocketmqlogsmqbroker.log

Caused by: com.alibaba.rocketmq.client.exception.MQClientException: No route info of this topic, huang_1See http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&topic_not_exist for further details. at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:525) ~[rocketmq-client-3.5.3.jar:na] at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1011) ~[rocketmq-client-3.5.3.jar:na] at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:970) ~[rocketmq-client-3.5.3.jar:na] at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:90) ~[rocketmq-client-3.5.3.jar:na] at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.send(ProducerImpl.java:107) ~[ons-client-1.2.3.jar:na]

并发上述极度运行时增加autoCreateTopicEnable=true

4、查看topic命令:mqadmin topicList -n "127.0.0.1:9876"

cd 到bin目录,施行下边发号出令mqadmin updateTopic -t test_1 -b "127.0.0.1:10911" -n "127.0.0.1:9876"增多如下参数到eclipse运营工程的VM参数里-Drocketmq.namesrv.addr=127.0.0.1:9876

2、应用解耦,
订单系统:顾客下单后,订单系统完毕长久化管理,将音讯写入音信队列,重临客户订单下单成功。
库存系统:订阅下单的新闻,选取拉/推的秘技,获取下单新闻,仓库储存系统根据下单新闻,举办仓库储存操作。
风流罗曼蒂克旦:在下单时仓库储存系统不能够符合规律使用。也不影响健康下单,因为下单后,订单系统写入信息队列就不再关注别的的世袭操作了。完成订单系统与库存系统的行使解耦。

1.能够保险严厉的音信顺序

罗克etMq监察和控制平台搭建

内需去github下载,下载链接rocketmq-console

下载后在rocketmq-console文件夹里,ctrl+shift,右键,在那处张开命令窗口,张开cmd窗口,首要要先搭建好maven景况

mvn clean package -Dmaven.test.skip=true

卷入达成以往,大家去target文件夹找到rocketmq-console-ng-1.0.0.jar然后

mkdir rocketmq-consolecd /usr/local/rocketmq-console

使用xftp上传rocketmq-console-ng-1.0.0.jar到/usr/local/rocketmq-console

nohup java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=127.0.0.1:9876 >/usr/local/rocketmq-console/run.log 2>&1 &

端口检查

netstat -anp|grep 12581

布局成功,展开

澳门官方赌场 2这里写图片描述

3、流量削锋,也是消息队列中的常用途景,通常在秒杀或团抢活动中运用大面积。
使用处景:秒杀活动,常常会因为流量过大,引致流量暴增,应用挂掉。为解决那几个主题材料,平日供给在选取前端参与音信队列。
3.1、能够调节移动的人口;
3.2、能够化解长期内高流量打散应用;
3.3、客户的伸手,服务器收到后,首先写入音信队列。假若音信队列长度抢先最大数目,则一向吐弃客户需求或跳转到错误页面;
3.4、秒杀业务依据音信队列中的必要音信,再做持续管理。

2.提供丰盛的音信拉取方式

编制程序完毕MQ实例

maven参加配置

<dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.0.10</version> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>3.0.10</version> <type>pom</type> </dependency>

音信队列消费者花费音讯实例

package com.mq.test;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;import java.util.List;public class MQConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "mq-group"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setInstanceName("consumer"); consumer.subscribe("TopicA-test", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("RocketMQ Consumer Started..."); }}

音讯队列生产者产生新闻实例

package com.mq.test;import java.util.Date;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;public class MQProducer { public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("mq-group");// producer.setNamesrvAddr("123.207.63.192:9876"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("producer"); producer.start(); try { for (int i = 0; i < 10; i++) { Thread.sleep; //MQ每隔一秒发送一条消息 Message msg = new Message("TopicA-test",// topic "TagA",// tag ("RocketMQ message"+i) .getBytes()// body ); SendResult sendResult = producer.send;//发送消息 } } catch (Exception e) { e.printStackTrace(); } producer.shutdown();//关闭消息生产者 }}

上面是来源于github wiki的上学例子

Filter互连网布局,以CPU财富换取宝贵的网卡流量能源

澳门官方赌场 3screenshot

伊始Broker时,增添以下配置,能够自行加载Filter Server进度

filterServerNums=1

Filter样品(Consumer仅负担将代码上传到Filter Server,由Filter Server编写翻译后实施)

package com.alibaba.rocketmq.example.filter;import com.alibaba.rocketmq.common.filter.MessageFilter;import com.alibaba.rocketmq.common.message.MessageExt;public class MessageFilterImpl implements MessageFilter { @Override public boolean match(MessageExt msg) { String property = msg.getUserProperty("SequenceId"); if (property != null) { int id = Integer.parseInt; if  == 0 && (id > 10)) { return true; } } return false; }}

Consumer例子

 public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4"); String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java"); consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl", filterCode); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); <br> consumer.start(); System.out.println("Consumer Started."); }

附录

罗克etMQ原理与安装教程

RocketMQ实例

阿里RocketMQ Quick Start

罗克etMQ集群安装

rocketMq监察和控制平台rocketmq-console搭建

4、日志管理

3.急速的订阅者水平扩张能力

5、新闻报道
音信报导是指,信息队列平时都置于了急忙的通讯机制,由此也得以用在纯的消息报纸发表。比方完毕点对点音信队列,或然闲聊室等。

4.实时的信息订阅机制

6、性能:

5.亿级音信聚成堆工夫

罗克etMQ单机也足以支撑亿级的音讯堆放技艺

二:安装RocketMQ

单机写入TPS单实例约7万条/秒,单机安顿3个Broker,能够跑到最高12万条/秒,音信大小十二个字节

下载源码

二、罗克etMQ网络铺排图

澳门官方赌场 4

率先大家从githup上收获罗克etMQ的源码,前段时间最新的版本为3.5.8,下载地址为: 或者 wget

互连网集群节点描述

1、Name Server 可集群安排,节点之间无其余消息同步。

2、Broker(新闻中间转播剧中人物,担任积累新闻,转载新闻卡塔尔 陈设相对复杂,Broker 分为Master 与Slave,三个Master 能够对应八个Slave,不过一个Slave 只好对应四个Master,Master 与Slave 的附和关系通过点名雷同的BrokerName,差别的BrokerId来定 义,BrokerId为0 表示Master,非0 表示Slave。Master 也足以配备七个。每一个Broker 与Name。

3、Producer 与Name Server 集群中的在这之中叁个节点(随机选拔)构建长连接,准时从Name Server 取Topic 路由消息,并向提供Topic 服务的Master 创建长连接,且按期向Master 发送心跳。Producer 完全无状态,可集群计划。

4、Consumer 与Name Server 集群中的个中壹个节点(随机筛选)组建长连接,定期从Name Server 取Topic 路由新闻,并向提供Topic 服务的Master、Slave 创设长连接,且准时向Master、Slave 发送心跳。Consumer不只能从Master 订阅音讯,也得以从Slave 订阅音讯,订阅法则由Broker 配置决定。

三、实际集群安顿操作步骤(接受多Master多Slave,异步复制集群格局展开安顿卡塔尔

1、下载计划包asp澳门明升平台娱乐,netandjava/9696958" target="_blank">alibaba-rocketmq-3.2.6.tar.gz

2、将下载的包放入并解压到Linux系统钦点目录中,在这里间作者以/home/目录为例,酌量两台服务器分别为

192.168.113.138、192.168.113.134 上面步骤就不区分是在哪台服务器举办操作了,步骤都以相通的

-- 解压

#tar -zxvfalibaba-rocketmq-3.2.6.tar.gz

澳门官方赌场,-- 校正配置文件由于我们选拔的是异步复制情势,所以须要改过2m-2s-async配置内容,--2m-2s-sync 是象征一起复制方式 ,2m-noslave是代表单击模式

#修改Master

#vim /home/alibaba-rocketmq/conf/2m-2s-async/broker-a.properties

#修改Salves

#vim /home/alibaba-rocketmq/conf/2m-2s-async/broker-a-s.properties

#Broker所属哪个集群,暗中同意【DefaultCluster】

brokerClusterName=DefaultCluster

#本机主机名

brokerName=hadoop1

#BrokerId,必得是大等于0的整数,0表示Master,>0表示Slave

brokerId=0

#除去文件时间点,暗中同意早晨4点

deleteWhen=04

#文本保留时间,暗中同意48钟头

fileReservedTime=48

#Broker的角色 - ASYNC_MASTETiguan 异步复制Master - SYNC_MASTE本田CR-V 同步复制

brokerRole=ASYNC_MASTER

#刷盘形式 - ASYNC_FLUSH 异步刷盘 - SYNC_FLUSH 同步刷盘

flushDiskType=ASYNC_FLUSH

#Name Server地址

namesrvAddr=192.168.113.137:9876;192.168.113.134:9876

#Broker对外劳务的监听端口,暗中同意【10911】

listenPort=10911

defaultTopicQueueNums=4

#是还是不是同意Broker自动创造Topic,提议线下开启,线上关闭,私下认可【true】

autoCreateTopicEnable=true

#是或不是同意Broker自动创设订阅组,提议线下开启,线上关闭,默许【true】

autoCreateSubscriptionGroup=true

mapedFileSizeCommitLog=1073741824

mapedFileSizeConsumeQueue=50000000

destroyMapedFileIntervalForcibly=120000

redeleteHangedFileInterval=120000

diskMaxUsedSpaceRatio=88

#新闻队列存款和储蓄的根目录,要求自定义目录路线

storePathRootDir=/usr/local/alibaba-rocketmq/data/store

#新闻队列存储的实在目录,必要自定义目录路线

storePathCommitLog=/usr/local/alibaba-rocketmq/data/store/commitlog

maxMessageSize=65536

flushCommitLogLeastPages=4

flushConsumeQueueLeastPages=2

flushCommitLogThoroughInterval=10000

flushConsumeQueueThoroughInterval=60000

checkTransactionMessageEnable=false

sendMessageThreadPoolNums=128

pullMessageThreadPoolNums=128

3、启动RocketMQ

3.1、在192.168.113.138启动Master、Slave服务(进入/home/alibaba-rocketmq/bin 目录)

#nohup sh mqnamesrv 1>/home/alibaba-rocketmq/log/ng.log 2>/home/alibaba-rocketmq/log/ng-error.log &

#nohup sh mqbroker -n 192.168.113.138:9876 -c /home/alibaba-rocketmq/conf/2m-2s-async/broker-a.properties >/home/alibaba-rocketmq/log/mq.log &

#nohup sh mqbroker -n 192.168.113.138:9876 -c /home/alibaba-rocketmq/conf/2m-2s-async/broker-a-s.properties >/home/alibaba-rocketmq/log/mq.log &

3.2、在192.168.113.134启动Master、Slave服务(进入 /home/alibaba-rocketmq/bin 目录)

#nohup sh mqnamesrv 1>/home/alibaba-rocketmq/log/ng.log 2>/home/alibaba-rocketmq/log/ng-error.log &

#nohup sh mqbroker -n 192.168.113.134:9876 -c /home/alibaba-rocketmq/conf/2m-2s-async/broker-a.properties >/home/alibaba-rocketmq/log/mq.log &

#nohup sh mqbroker -n 192.168.113.134:9876 -c /home/alibaba-rocketmq/conf/2m-2s-async/broker-a-s.properties >/home/alibaba-rocketmq/log/mq.log &

3.3、JPS检查

#jps

查看是或不是犹如下进度,假设有则服务运维成功:

BrokerStartup

NamesrvStartup

关停服务命令:

#sh mqshutdown namesrv
#sh mqshutdown broker

3.4、常用命令

--查看全部花费组group
#sh mqadmin consumerProgress -n 192.168.113.134:9876
--看内定花费组下的具备topic数据聚成堆情状
#sh mqadmin consumerProgress -n 192.168.113.134:9876 -g myProducer
翻开钦点报文新闻C0A8718A00002A9F000000000000F790 指的是报文在MQ队列中的MSG_ID
#sh mqadmin queryMsgById -n 192.168.113.138:9876 -i C0A8718A00002A9F000000000000F790

--查看Topic 列表音讯
#sh mqadmin topicList -n 192.168.113.138:9876
--查看topic新闻列表详细情况总括
#sh mqadmin topicstatus -n 192.168.113.138:9876 -t Topic
--查看集群音信
#sh mqadmin clusterList -n 192.168.113.138:9876

编写翻译源码