Ricardo的博客

我本逍遥一散人

基于Kratos实现的一个分布式IM(待完善)

构建高性能分布式即时通讯系统:基于 Kratos 的实现

在当今互联网时代,实时通信应用(IM)在社交、协作和商业领域中扮演着至关重要的角色。本文将详细介绍一个基于 Kratos 框架实现的分布式即时通讯系统(IM),该系统利用了多种先进的技术组件,如 Kong、Consul、RocketMQ、Redis、MongoDB、MySQL 以及 WebSocket,实现高性能、可扩展和可靠的即时通信服务。

🔗 代码仓库地址https://github.com/dyjh/im-service

1. 项目功能实现

该即时通讯系统旨在提供以下核心功能:

  • 实时消息传输:支持用户之间的实时文本消息交流。
  • 群组管理:用户可以加入或创建聊天群组,实现多人聊天。
  • 消息持久化:所有消息将被存储到 MongoDB 中,支持历史消息查询。
  • 高可用性和可扩展性:通过分布式架构和负载均衡,确保系统的高可用性和可扩展性。
  • 服务发现与负载均衡:使用 Consul 和 Kong 实现动态服务发现与负载均衡。
  • 消息分发:利用 RocketMQ 作为分布式消息队列,实现高效的消息分发。
  • 用户会话管理:通过 Redis 存储用户会话信息,实现快速的用户信息查询和管理。
  • gRPC 接口:提供多种 gRPC 接口,支持用户绑定、群组管理、消息发送与历史记录获取等操作。

2. 目录构成

项目的目录结构设计合理,模块划分清晰,便于开发和维护。以下是项目的主要目录结构:

.
├── Dockerfile  
├── LICENSE
├── Makefile  
├── deploy // 环境快速搭建
├── README.md
├── api // 存放 proto 文件及生成的 Go 代码
│   └── chat
│       └── service
│           └── v1
│               ├── error_reason.pb.go
│               ├── error_reason.proto
│               ├── ws.pb.go
│               ├── ws.proto
│               └── ws_grpc.pb.go
├── app
│   └── chat
│       └── service
│           ├── cmd  // 项目启动入口
│           │   └── server
│           │       ├── handler // WebSocket 处理器
│           │       ├── main.go
│           │       ├── wire.go  // 使用 Wire 进行依赖注入
│           │       └── wire_gen.go
│           ├── configs  // 配置文件
│           │  ├── config.yaml
│           │  └── registry.yaml
│         ├── internal  // 内部业务逻辑
│           │   ├── biz   // 业务逻辑层
│           │   │   ├── biz.go
│           │   │   └── ws.go
│           │   ├── conf  // 配置结构定义
│           │   │   ├── conf.pb.go
│           │   │   └── conf.proto
│           │   ├── model  // 数据库模型
│           │   ├── data  // 数据访问层
│           │   │   ├── data.go
│           │   │   └── ws.go
│           │   ├── server  // HTTP 和 gRPC 服务器配置
│           │   │   ├── grpc.go
│           │   │   ├── http.go
│           │   │   └── server.go
│           │   └── service  // 服务层实现
│           │       ├── greeter.go
│           │       └── service.go
│           ├── Dockerfile  
│           ├── Makefile  
│           └── utils // 工具函数
├── generate.go
├── go.mod
├── go.sum
└── third_party  // 第三方依赖的 proto 文件
    ├── README.md
    ├── google
    │   └── api
    │       ├── annotations.proto
    │       ├── http.proto
    │       └── httpbody.proto
    └── validate
        ├── README.md
        └── validate.proto

3. 功能运作原理及技术栈

核心技术组件

  • Kratos:Go 语言的微服务框架,用于快速构建高性能的服务。
  • Kong:作为 API 网关,负责请求路由、负载均衡和安全管理。
  • Consul:服务发现与配置管理工具,与 Kong 集成,实现动态负载均衡。
  • RocketMQ:分布式消息队列,用于节点间的消息分发。
  • Redis:高性能的键值存储,用于存储用户会话信息和缓存。
  • MongoDB:NoSQL 数据库,用于存储聊天消息的历史记录。
  • MySQL:关系型数据库,用于存储用户和群组的结构化数据。
  • WebSocket:实现实时双向通信,支持实时消息传输。
  • gRPC:高性能的远程过程调用(RPC)框架,用于服务间通信。

分布式架构与工作原理

系统架构图

wxjt 1 - 基于Kratos实现的一个分布式IM(待完善)

组件职责

  1. Kong + Consul

    • Kong 作为网关平台,负责接收所有外部请求,并进行路由转发。
    • Consul 集成于 Kong,实现服务的动态发现与负载均衡,确保请求均匀分布到各个服务节点。
  2. RocketMQ

    • 作为消息分发器,负责不同节点间的消息传递。
    • 所有节点订阅固定的 topic chatMessage,以不同的Tag做节点区分,确保消息的有序传递。
  3. Redis

    • 存储当前 WebSocket 服务节点的信息,包括 IP 和端口。
    • 用于快速查询用户所属的 RocketMQ tag,实现消息的精准路由。
  4. MongoDB

    • 负责持久化存储聊天消息历史,支持高效的查询和检索。
  5. MySQL

    • 存储用户信息、群组信息及其关联关系。
  6. WebSocket

    • 实现客户端与服务器之间的实时双向通信,确保消息的即时传输。

消息处理流程

  1. 用户连接

    • 用户通过 WebSocket 连接到某一台 WebSocket 服务节点。
    • 连接建立后,服务节点会生成一个唯一的 RocketMQ tag,并将用户的 WebSocket 服务节点信息(IP 和端口)存入 Redis。
  2. 消息发送

    • 当用户发送消息时,服务器根据 Redis 中存储的信息,确定目标用户所在的节点及其对应的 RocketMQ tag。
    • 消息通过 RocketMQ 发送到对应的节点。
  3. 消息分发

    • 目标节点接收到消息后,将其推送到相应的 WebSocket 连接,确保消息能够实时到达用户。
  4. 消息持久化

    • 消息被存储到 MongoDB 的文档中,确保历史消息的可靠存储和后续查询。
  5. gRPC 服务

    • 提供了多个 gRPC 接口,如用户绑定、群组管理、消息发送与历史记录获取等,支持客户端的各种操作需求。

4. 项目结构

项目采用模块化设计,分为多个层级,确保代码的清晰和可维护性。

主要模块

  • api:存放所有的 proto 文件及生成的 Go 代码,定义了服务的接口和数据结构。
  • app/chat/service:核心业务逻辑所在,包括服务启动、配置管理、内部业务逻辑、数据访问和工具函数。
  • deploy:包含快速搭建环境的配置文件和脚本,支持快速部署 Kong、Consul 和 RocketMQ。
  • third_party:存放第三方依赖的 proto 文件,如 Google API 和验证规则。

关键目录说明

  • cmd/server:项目的启动入口,包含 WebSocket 处理器和服务器配置。
  • internal/biz:业务逻辑层,处理具体的业务需求。
  • internal/data:数据访问层,封装了与数据库的交互逻辑。
  • internal/server:配置和启动 HTTP 和 gRPC 服务器。
  • internal/service:实现了 API 定义的服务层,负责处理数据转换和业务协同。
  • utils:包含一些通用的工具函数,供项目各模块使用。

5. 项目部分代码解析

以下是项目中 WebSocket 处理器的示例代码,展示了消息接收、处理和分发的关键流程。

WebSocket 处理器

func (h *Handler) WsHandler(ctx *gin.Context) {

    // 升级成 WebSocket 协议
    conn, err := (&websocket.Upgrader{
        ReadBufferSize:  int(wsConf.WriteReadBufferSize),
        WriteBufferSize: int(wsConf.WriteReadBufferSize),
        CheckOrigin: func(r *http.Request) bool { // 解决跨域问题
            return true
        }}).
        Upgrade(ctx.Writer, ctx.Request, nil)

    if err != nil {
        h.log.Error(consts.WebsocketUpgradeFailMsg)
        return
    }

    // 创建一个用户客户端会话实例
    newClient := &Client{
        Uid:           utils.GetClientUid(),
        Socket:        conn,
        Send:          make(chan ReplyMsg),
        state:         1,
        ReadDeadline:  time.Duration(wsConf.ReadDeadline) * time.Second,
        WriteDeadline: time.Duration(wsConf.WriteDeadline) * time.Second,
    }
    // 用户会话注册到用户管理上
    h.ClientManager.Register <- newClient

    // 启动消息读取和写入协程
    go newClient.read(ctx, h)
    go newClient.write(h)

    // 启动心跳服务
    newClient.HeartBeat()
}

主要功能点:

  • WebSocket 升级:将 HTTP 连接升级为 WebSocket 连接,实现实时通信。
  • 客户端管理:创建并管理客户端会话,确保每个连接的唯一性和可管理性。
  • 消息读取与写入:通过 Goroutine 实现并发的消息读取和写入,确保高效的消息处理。
  • 心跳机制:定期发送心跳包,检测连接的有效性,防止连接断开。

客户端管理器

客户端管理器负责维护所有活跃的客户端连接,处理注册与注销请求。

type ClientManager struct {
    Clients             map[string]*Client
    MapUserIdToClientId map[uint]string
    Mutex               sync.Mutex
    Reply               chan *Client
    Register            chan *Client
    Unregister          chan *Client
    RedisClient         *redis.Client
    MongoDb             *mongo.Database
    Mysql               *gorm.DB
}

管理流程:

  • 注册:当新的客户端连接时,将其添加到 Clients 映射中,并通过 Redis 更新其会话信息。
  • 注销:当客户端断开连接时,从 Clients 中移除,并清理相关的会话数据。

6. 项目部署

项目提供了 deploy 目录,内含快速搭建所需的配置文件和脚本,可以快速搭建 Kong、Consul 和 RocketMQ 等服务。以下是快速搭建步骤:

快速部署 Kong + Consul

  1. 进入 Kong 部署目录

    cd deploy/kong
  2. 启动 Kong 和 Consul

    docker-compose -f docker-compose.yaml up -d

快速部署 RocketMQ

  1. 进入 RocketMQ 部署目录

    cd deploy/rocketmq
  2. 启动 RocketMQ

    docker-compose -f docker-compose.yaml up -d
  3. 创建 RocketMQ Topic

    RocketMQ 启动后,需要手动创建一个 chatMessage 的 topic。

    # 进入 RocketMQ 容器
    docker exec -it rocketmq bash
    
    # 创建 topic
    mqadmin updateTopic -n localhost:9876 -t chatMessage

启动即时通讯服务

  1. 克隆仓库并进入项目目录

    git clone https://github.com/dyjh/im-service.git
    cd im-service
  2. 安装依赖并生成代码

    make generate
    make build
  3. 启动服务

    ./im-service

7. 总结

本文详细介绍了一个基于 Kratos 框架实现的分布式即时通讯系统,涵盖了项目的功能实现、目录结构、技术栈、架构设计、关键代码解析以及部署步骤。通过使用 Kong 与 Consul 实现高效的请求路由与负载均衡,RocketMQ 保证消息的可靠传递,Redis 与 MongoDB 提供快速和持久的数据存储,WebSocket 实现实时通信,使得整个系统具备高可用性、可扩展性和实时性。

如果您对该项目感兴趣,欢迎访问 GitHub 仓库 获取更多信息和代码详情。

Docker 部署 kong 并且实现 grpc 兼容

上一篇
评论
发表评论 说点什么
还没有评论
19
0