基于Kratos实现的一个分布式IM(待完善)
构建高性能分布式即时通讯系统:基于 Kratos 的实现
在当今互联网时代,实时通信应用(IM)在社交、协作和商业领域中扮演着至关重要的角色。本文将详细介绍一个基于 Kratos 框架实现的分布式即时通讯系统(IM),该系统利用了多种先进的技术组件,如 Kong、Consul、RocketMQ、Redis、MongoDB、MySQL 以及 WebSocket,实现高性能、可扩展和可靠的即时通信服务。
🔗 代码仓库地址:https://github.com/dyjh/im-service
1. 项目功能实现
该即时通讯系统旨在提供以下核心功能:
- 实时消息传输:支持用户之间的实时文本消息交流。
- 群组管理:用户可以加入或创建聊天群组,实现多人聊天。
- 消息持久化:所有消息将被存储到 MongoDB 中,支持历史消息查询。
- 高可用性和可扩展性:通过分布式架构和负载均衡,确保系统的高可用性和可扩展性。
- 服务发现与负载均衡:使用 Consul 和 Kong 实现动态服务发现与负载均衡。
- 消息分发:利用 RocketMQ 作为分布式消息队列,实现高效的消息分发。
- 用户会话管理:通过 Redis 存储用户会话信息,实现快速的用户信息查询和管理。
- gRPC 接口:提供多种 gRPC 接口,支持用户绑定、群组管理、消息发送与历史记录获取等操作。
2. 目录构成
项目的目录结构设计合理,模块划分清晰,便于开发和维护。以下是项目的主要目录结构:
.
├── Dockerfile
├── LICENSE
├── Makefile
├── deploy // 环境快速搭建
├── README.md
├── api // 存放 proto 文件及生成的 Go 代码
│ └── chat
│ └── service
│ └── v1
│ ├── error_reason.pb.go
│ ├── error_reason.proto
│ ├── ws.pb.go
│ ├── ws.proto
│ └── ws_grpc.pb.go
├── app
│ └── chat
│ └── service
│ ├── cmd // 项目启动入口
│ │ └── server
│ │ ├── handler // WebSocket 处理器
│ │ ├── main.go
│ │ ├── wire.go // 使用 Wire 进行依赖注入
│ │ └── wire_gen.go
│ ├── configs // 配置文件
│ │ ├── config.yaml
│ │ └── registry.yaml
│ ├── internal // 内部业务逻辑
│ │ ├── biz // 业务逻辑层
│ │ │ ├── biz.go
│ │ │ └── ws.go
│ │ ├── conf // 配置结构定义
│ │ │ ├── conf.pb.go
│ │ │ └── conf.proto
│ │ ├── model // 数据库模型
│ │ ├── data // 数据访问层
│ │ │ ├── data.go
│ │ │ └── ws.go
│ │ ├── server // HTTP 和 gRPC 服务器配置
│ │ │ ├── grpc.go
│ │ │ ├── http.go
│ │ │ └── server.go
│ │ └── service // 服务层实现
│ │ ├── greeter.go
│ │ └── service.go
│ ├── Dockerfile
│ ├── Makefile
│ └── utils // 工具函数
├── generate.go
├── go.mod
├── go.sum
└── third_party // 第三方依赖的 proto 文件
├── README.md
├── google
│ └── api
│ ├── annotations.proto
│ ├── http.proto
│ └── httpbody.proto
└── validate
├── README.md
└── validate.proto
3. 功能运作原理及技术栈
核心技术组件
- Kratos:Go 语言的微服务框架,用于快速构建高性能的服务。
- Kong:作为 API 网关,负责请求路由、负载均衡和安全管理。
- Consul:服务发现与配置管理工具,与 Kong 集成,实现动态负载均衡。
- RocketMQ:分布式消息队列,用于节点间的消息分发。
- Redis:高性能的键值存储,用于存储用户会话信息和缓存。
- MongoDB:NoSQL 数据库,用于存储聊天消息的历史记录。
- MySQL:关系型数据库,用于存储用户和群组的结构化数据。
- WebSocket:实现实时双向通信,支持实时消息传输。
- gRPC:高性能的远程过程调用(RPC)框架,用于服务间通信。
分布式架构与工作原理
系统架构图
组件职责
-
Kong + Consul:
- Kong 作为网关平台,负责接收所有外部请求,并进行路由转发。
- Consul 集成于 Kong,实现服务的动态发现与负载均衡,确保请求均匀分布到各个服务节点。
-
RocketMQ:
- 作为消息分发器,负责不同节点间的消息传递。
- 所有节点订阅固定的 topic
chatMessage
,以不同的Tag做节点区分,确保消息的有序传递。
-
Redis:
- 存储当前 WebSocket 服务节点的信息,包括 IP 和端口。
- 用于快速查询用户所属的 RocketMQ tag,实现消息的精准路由。
-
MongoDB:
- 负责持久化存储聊天消息历史,支持高效的查询和检索。
-
MySQL:
- 存储用户信息、群组信息及其关联关系。
-
WebSocket:
- 实现客户端与服务器之间的实时双向通信,确保消息的即时传输。
消息处理流程
-
用户连接:
- 用户通过 WebSocket 连接到某一台 WebSocket 服务节点。
- 连接建立后,服务节点会生成一个唯一的 RocketMQ tag,并将用户的 WebSocket 服务节点信息(IP 和端口)存入 Redis。
-
消息发送:
- 当用户发送消息时,服务器根据 Redis 中存储的信息,确定目标用户所在的节点及其对应的 RocketMQ tag。
- 消息通过 RocketMQ 发送到对应的节点。
-
消息分发:
- 目标节点接收到消息后,将其推送到相应的 WebSocket 连接,确保消息能够实时到达用户。
-
消息持久化:
- 消息被存储到 MongoDB 的文档中,确保历史消息的可靠存储和后续查询。
-
gRPC 服务:
- 提供了多个 gRPC 接口,如用户绑定、群组管理、消息发送与历史记录获取等,支持客户端的各种操作需求。
4. 项目结构
项目采用模块化设计,分为多个层级,确保代码的清晰和可维护性。
主要模块
- api:存放所有的 proto 文件及生成的 Go 代码,定义了服务的接口和数据结构。
- app/chat/service:核心业务逻辑所在,包括服务启动、配置管理、内部业务逻辑、数据访问和工具函数。
- deploy:包含快速搭建环境的配置文件和脚本,支持快速部署 Kong、Consul 和 RocketMQ。
- third_party:存放第三方依赖的 proto 文件,如 Google API 和验证规则。
关键目录说明
- cmd/server:项目的启动入口,包含 WebSocket 处理器和服务器配置。
- internal/biz:业务逻辑层,处理具体的业务需求。
- internal/data:数据访问层,封装了与数据库的交互逻辑。
- internal/server:配置和启动 HTTP 和 gRPC 服务器。
- internal/service:实现了 API 定义的服务层,负责处理数据转换和业务协同。
- utils:包含一些通用的工具函数,供项目各模块使用。
5. 项目部分代码解析
以下是项目中 WebSocket 处理器的示例代码,展示了消息接收、处理和分发的关键流程。
WebSocket 处理器
func (h *Handler) WsHandler(ctx *gin.Context) {
// 升级成 WebSocket 协议
conn, err := (&websocket.Upgrader{
ReadBufferSize: int(wsConf.WriteReadBufferSize),
WriteBufferSize: int(wsConf.WriteReadBufferSize),
CheckOrigin: func(r *http.Request) bool { // 解决跨域问题
return true
}}).
Upgrade(ctx.Writer, ctx.Request, nil)
if err != nil {
h.log.Error(consts.WebsocketUpgradeFailMsg)
return
}
// 创建一个用户客户端会话实例
newClient := &Client{
Uid: utils.GetClientUid(),
Socket: conn,
Send: make(chan ReplyMsg),
state: 1,
ReadDeadline: time.Duration(wsConf.ReadDeadline) * time.Second,
WriteDeadline: time.Duration(wsConf.WriteDeadline) * time.Second,
}
// 用户会话注册到用户管理上
h.ClientManager.Register <- newClient
// 启动消息读取和写入协程
go newClient.read(ctx, h)
go newClient.write(h)
// 启动心跳服务
newClient.HeartBeat()
}
主要功能点:
- WebSocket 升级:将 HTTP 连接升级为 WebSocket 连接,实现实时通信。
- 客户端管理:创建并管理客户端会话,确保每个连接的唯一性和可管理性。
- 消息读取与写入:通过 Goroutine 实现并发的消息读取和写入,确保高效的消息处理。
- 心跳机制:定期发送心跳包,检测连接的有效性,防止连接断开。
客户端管理器
客户端管理器负责维护所有活跃的客户端连接,处理注册与注销请求。
type ClientManager struct {
Clients map[string]*Client
MapUserIdToClientId map[uint]string
Mutex sync.Mutex
Reply chan *Client
Register chan *Client
Unregister chan *Client
RedisClient *redis.Client
MongoDb *mongo.Database
Mysql *gorm.DB
}
管理流程:
- 注册:当新的客户端连接时,将其添加到
Clients
映射中,并通过 Redis 更新其会话信息。 - 注销:当客户端断开连接时,从
Clients
中移除,并清理相关的会话数据。
6. 项目部署
项目提供了 deploy
目录,内含快速搭建所需的配置文件和脚本,可以快速搭建 Kong、Consul 和 RocketMQ 等服务。以下是快速搭建步骤:
快速部署 Kong + Consul
-
进入 Kong 部署目录:
cd deploy/kong
-
启动 Kong 和 Consul:
docker-compose -f docker-compose.yaml up -d
快速部署 RocketMQ
-
进入 RocketMQ 部署目录:
cd deploy/rocketmq
-
启动 RocketMQ:
docker-compose -f docker-compose.yaml up -d
-
创建 RocketMQ Topic:
RocketMQ 启动后,需要手动创建一个
chatMessage
的 topic。# 进入 RocketMQ 容器 docker exec -it rocketmq bash # 创建 topic mqadmin updateTopic -n localhost:9876 -t chatMessage
启动即时通讯服务
-
克隆仓库并进入项目目录:
git clone https://github.com/dyjh/im-service.git cd im-service
-
安装依赖并生成代码:
make generate make build
-
启动服务:
./im-service
7. 总结
本文详细介绍了一个基于 Kratos 框架实现的分布式即时通讯系统,涵盖了项目的功能实现、目录结构、技术栈、架构设计、关键代码解析以及部署步骤。通过使用 Kong 与 Consul 实现高效的请求路由与负载均衡,RocketMQ 保证消息的可靠传递,Redis 与 MongoDB 提供快速和持久的数据存储,WebSocket 实现实时通信,使得整个系统具备高可用性、可扩展性和实时性。
如果您对该项目感兴趣,欢迎访问 GitHub 仓库 获取更多信息和代码详情。