Ubuntu部署Rocketmq 并且使用gorpc搭建外网接口访问
1、官网及版本特性
官网:https://rocketmq.apache.org/
特性及版本发布信息:https://www.oschina.net/p/rocketmq
2、下载
https://dlcdn.apache.org/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip
$ cd /opt/programs/
$ sudo wget https://dlcdn.apache.org/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip
$ sudo unzip rocketmq-all-5.0.0-bin-release.zip
$ sudo mv rocketmq-all-5.0.0-bin-release rocketmq
4、修改JVM配置
RocketMQ 默认的虚拟机内存较大,启动 Broker 或者 NameServer 可能会因为内存不足而导致失败,所以需要编辑如下两个配置文件,修改 JVM 内存大小。
修改bin/runbroker.sh默认JVM 大小
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
修改bin/runserver.sh默认JVM 大小
choose_gc_options()
{
# Example of JAVA_MAJOR_VERSION value : '1', '9', '10', '11', ...
# '1' means releases befor Java 9
JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -r -n 's/.* version "([0-9]*).*$/\1/p')
if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
else
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"
fi
}
5、创建快捷启动方式
在这里我用zsh
$ vim ~/.zshrc
alias mqsrvstart="nohup sh /opt/programs/rocketmq/bin/mqnamesrv &"
alias mqsrvstop="sh /opt/programs/rocketmq/bin/mqshutdown namesrv"
alias brokerstart="nohup sh /opt/programs/rocketmq/bin/mqbroker -n localhost:9876 &"
alias brokerstop="sh /opt/programs/rocketmq/bin/mqshutdown broker"
$ source ~/.zshrc
6、启动
# 首先启动Name Server
$ mqsrvstart
# 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
# 启动Broker
$ brokerstart
# 验证Broker是否启动成功,例如Broker的IP为:192.168.1.2,且名称为broker-a
$ tail -f ~/logs/rocketmqlogs/broker.log
The broker[VM-4-15-ubuntu, 10.0.4.9:10911] boot success. serializeType=JSON and name server is localhost:9876
# 关闭
$ brokerstop
$ mqsrvstop
7、链接以及外网简单测试(rpc)
代码demo地址:https://github.com/dyjh/rocketmq-rpc
服务端以及服务定义
package main
import (
"go-rocketmq/delay"
"go-rocketmq/simple"
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
)
// Golang RPC 的实现需要 5 个步骤
// 1. 定义一个服务结构
// 2. 为这个服务结构定义几个服务方法,每个方法接受两个参数和返回 error 类型
// 3. 使用 rpc.RegisterName() 方法注册 「服务结构」 的实例
// 4. 监听套接字
// 5. 为每一个套接字调用 jsonrpc.ServerConn(conn) 方法
type RocketService struct {
}
// 定义 RocketService 所需要的参数,一般是两个,string 类型
type Args struct {
Message string
}
// 2.
// 实现延时消息,需要两个参数
// 所有的 jsonrpc 方法只有两个参数,第一个参数用于接收所有参数,
// 第二个参数用于处理返回结果,是一个指针
// 所有的 jsonrpc 都只有一个返回值,error,用于指示是否发生错误
func (that *RocketService) Delay(args Args, reply *bool) error {
*reply = delay.PushDelay(args.Message)
return nil
}
// 实现即时推送服务
func (that *RocketService) Simple(args Args, reply *bool) error {
*reply = simple.PushSimple(args.Message)
return nil
}
// Hello
func (that *RocketService) Hello(request string, reply *string) error {
*reply = "hello " + request
return nil
}
func main() {
// 3.
rpc.RegisterName("Rocketmq", new(RocketService))
// 4.
sock, err := net.Listen("tcp", ":8082")
log.Println("listen at :8082")
if err != nil {
log.Fatal("listen error:", err)
}
for {
conn, err := sock.Accept()
if err != nil {
continue
}
// 5.
go jsonrpc.ServeConn(conn)
}
}
进入项目根目录开启服务
//启动rpc服务
$ go run main.go
//启动消费端服务
$ go run simple-consumer.go
2022/10/12 15:29:50 listen at :8083
本地使用telnet工具测试
$ sudo telnet xxx.xx.xx.xxx 8083
Trying xx.xx.xx.xxx...
Connected to xx.xx.xx.xxx.
Escape character is '^]'.
{"method": "Rocketmq.Simple","params": [{"Message": "123123123"}],"id": "0"}
{"id":"0","result":true,"error":null}
rpc服务端console
2022/10/12 16:51:59 listen at :8083
INFO[0019] the topic route info changed changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"VM-4-9-ubuntu\",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"VM-4-9-ubuntu\",\"brokerAddrs\":{\"0\":\"10.0.4.9:10911\"}}]}" changedFrom="<nil>" topic=SimpleTopic
2022-10-12 16:52:19: 消息: SendResult [sendStatus=0, msgIds=0A000409E94B000000003c44c1b80001, offsetMsgId=0A00040900002A9F00000000000004BA, queueOffset=3, messageQueue=MessageQueue [topic=SimpleTopic, brokerName=VM-4-9-ubuntu, queueId=1]]发送成功
rocket消费端console
2022-10-12 16:52:19 读取到一条消息,消息内容: 123123123
INFO[0240] Stats In One Minute, SUM: %d TPS: AVGPT: %.2f AVGPT=20889 SUM=20889 TPS=348.15 statsKey=SimpleTopic@test statsName=PULL_RT
INFO[0240] Stats In One Minute, SUM: %d TPS: AVGPT: %.2f AVGPT=0 SUM=0 TPS=0.00 statsKey=SimpleTopic@test statsName=CONSUME_RT
INFO[0240] Stats In One Minute, SUM: %d TPS: AVGPT: %.2f AVGPT=1 SUM=1 TPS=0.02 statsKey=SimpleTopic@test statsName=CONSUME_OK_TPS
INFO[0240] Stats In One Minute, SUM: %d TPS: AVGPT: %.2f AVGPT=1 SUM=1 TPS=0.02 statsKey=SimpleTopic@test statsName=PULL_TPS
INFO[0240] update offset to broker success MessageQueue="MessageQueue [topic=SimpleTopic, brokerName=VM-4-9-ubuntu, queueId=0]" consumerGroup=test offset=0