Ubuntu部署Rocketmq 并且使用gorpc搭建外网接口访问


1、官网及版本特性

官网:https://rocketmq.apache.org/
特性及版本发布信息:https://www.oschina.net/p/rocketmq

2、下载

https://dlcdn.apache.org/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip

$ cd /opt/programs/
$ sudo wget https://dlcdn.apache.org/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip
$ sudo unzip rocketmq-all-5.0.0-bin-release.zip
$ sudo mv rocketmq-all-5.0.0-bin-release rocketmq

4、修改JVM配置

RocketMQ 默认的虚拟机内存较大,启动 Broker 或者 NameServer 可能会因为内存不足而导致失败,所以需要编辑如下两个配置文件,修改 JVM 内存大小。

修改bin/runbroker.sh默认JVM 大小

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

修改bin/runserver.sh默认JVM 大小

choose_gc_options()
{
    # Example of JAVA_MAJOR_VERSION value : '1', '9', '10', '11', ...
    # '1' means releases befor Java 9
    JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -r -n 's/.* version "([0-9]*).*$/\1/p')
    if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then
      JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
      JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
      JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
      JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
    else
      JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
      JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
      JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"
    fi
}

5、创建快捷启动方式

在这里我用zsh

$ vim ~/.zshrc

alias mqsrvstart="nohup sh /opt/programs/rocketmq/bin/mqnamesrv &"
alias mqsrvstop="sh /opt/programs/rocketmq/bin/mqshutdown namesrv"
alias brokerstart="nohup sh /opt/programs/rocketmq/bin/mqbroker -n localhost:9876 &"
alias brokerstop="sh /opt/programs/rocketmq/bin/mqshutdown broker"

$ source ~/.zshrc

6、启动

# 首先启动Name Server
$ mqsrvstart

# 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

# 启动Broker
$ brokerstart

# 验证Broker是否启动成功,例如Broker的IP为:192.168.1.2,且名称为broker-a
$ tail -f ~/logs/rocketmqlogs/broker.log
The broker[VM-4-15-ubuntu, 10.0.4.9:10911] boot success. serializeType=JSON and name server is localhost:9876

# 关闭
$ brokerstop
$ mqsrvstop

7、链接以及外网简单测试(rpc)

代码demo地址:https://github.com/dyjh/rocketmq-rpc

服务端以及服务定义

package main

import (
    "go-rocketmq/delay"
    "go-rocketmq/simple"
    "log"
    "net"
    "net/rpc"
    "net/rpc/jsonrpc"
)

// Golang RPC 的实现需要 5 个步骤
// 1. 定义一个服务结构
// 2. 为这个服务结构定义几个服务方法,每个方法接受两个参数和返回 error 类型
// 3. 使用 rpc.RegisterName() 方法注册 「服务结构」 的实例
// 4. 监听套接字
// 5. 为每一个套接字调用 jsonrpc.ServerConn(conn) 方法

type RocketService struct {
}

// 定义 RocketService 所需要的参数,一般是两个,string 类型
type Args struct {
    Message string
}

// 2.
// 实现延时消息,需要两个参数
// 所有的 jsonrpc 方法只有两个参数,第一个参数用于接收所有参数,
// 第二个参数用于处理返回结果,是一个指针
// 所有的 jsonrpc 都只有一个返回值,error,用于指示是否发生错误
func (that *RocketService) Delay(args Args, reply *bool) error {
    *reply = delay.PushDelay(args.Message)
    return nil
}

// 实现即时推送服务
func (that *RocketService) Simple(args Args, reply *bool) error {
    *reply = simple.PushSimple(args.Message)
    return nil
}

// Hello
func (that *RocketService) Hello(request string, reply *string) error {
    *reply = "hello " + request
    return nil
}

func main() {
    // 3.
    rpc.RegisterName("Rocketmq", new(RocketService))
    // 4.
    sock, err := net.Listen("tcp", ":8082")
    log.Println("listen at :8082")
    if err != nil {
        log.Fatal("listen error:", err)
    }

    for {
        conn, err := sock.Accept()
        if err != nil {
            continue
        }
        // 5.
        go jsonrpc.ServeConn(conn)
    }

}

进入项目根目录开启服务

//启动rpc服务
$ go run main.go
//启动消费端服务
$ go run simple-consumer.go

2022/10/12 15:29:50 listen at :8083

本地使用telnet工具测试

$ sudo telnet xxx.xx.xx.xxx 8083
Trying xx.xx.xx.xxx...
Connected to xx.xx.xx.xxx.
Escape character is '^]'.
{"method": "Rocketmq.Simple","params": [{"Message": "123123123"}],"id": "0"}
{"id":"0","result":true,"error":null}

rpc服务端console

2022/10/12 16:51:59 listen at :8083
INFO[0019] the topic route info changed                  changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"VM-4-9-ubuntu\",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"VM-4-9-ubuntu\",\"brokerAddrs\":{\"0\":\"10.0.4.9:10911\"}}]}" changedFrom="<nil>" topic=SimpleTopic
2022-10-12 16:52:19: 消息: SendResult [sendStatus=0, msgIds=0A000409E94B000000003c44c1b80001, offsetMsgId=0A00040900002A9F00000000000004BA, queueOffset=3, messageQueue=MessageQueue [topic=SimpleTopic, brokerName=VM-4-9-ubuntu, queueId=1]]发送成功

rocket消费端console

2022-10-12 16:52:19 读取到一条消息,消息内容: 123123123
INFO[0240] Stats In One Minute, SUM: %d TPS:  AVGPT: %.2f  AVGPT=20889 SUM=20889 TPS=348.15 statsKey=SimpleTopic@test statsName=PULL_RT
INFO[0240] Stats In One Minute, SUM: %d TPS:  AVGPT: %.2f  AVGPT=0 SUM=0 TPS=0.00 statsKey=SimpleTopic@test statsName=CONSUME_RT
INFO[0240] Stats In One Minute, SUM: %d TPS:  AVGPT: %.2f  AVGPT=1 SUM=1 TPS=0.02 statsKey=SimpleTopic@test statsName=CONSUME_OK_TPS
INFO[0240] Stats In One Minute, SUM: %d TPS:  AVGPT: %.2f  AVGPT=1 SUM=1 TPS=0.02 statsKey=SimpleTopic@test statsName=PULL_TPS
INFO[0240] update offset to broker success               MessageQueue="MessageQueue [topic=SimpleTopic, brokerName=VM-4-9-ubuntu, queueId=0]" consumerGroup=test offset=0
  • 分享:
评论
还没有评论
    发表评论 说点什么
    蜀ICP备18035236号