Hyperf 实现简单的客服聊天系统


“终于不用再花钱买聊天软件啦”
最近在试着使用一些swoole框架,于是突发奇想写个客服聊天的小Demo

本项目使用Hyperf框架、WebSocket和Swoole共享内存实现了如下功能。

  1. 若当前有客服在线并且空闲,直接进入聊天
  2. 一名客服同时只能和一个客户聊天
  3. 客户若是超时未回复消息,自动退出聊天
  4. 若当前有客服在线并且都在聊天中,则分配一个排队人数最少的客服并且进入排队
  5. 若是当前无客服在线,直接返回'当前暂无客服上班',不进入排队
  6. 客户池以及客服池存入多线程共享内存,并且使用共享资源锁保 SWOOLE_MUTEX 保证多线程数据安全

以下是一个基本的代码实现,朋友们可以根据需要进行调整。例如使用mysql增加客服客户身份校验,以及聊天消息留存,这些在这里就不做示范。

安装Hyperf框架和依赖组件:

composer create-project hyperf/hyperf-skeleton chat_demo
cd chat_demo
composer require hyperf/websocket-server

这里需要注意的是,不分国内composer源并未更新库(如阿里源),所以无法安装最新的Hyperf,在这里我更换为华为源,这样默认安装的Hyperf是3.0版本

composer config --global repo.packagist composer https://mirrors.huaweicloud.com/repository/php/

Hyperf 安装选项一路回车即可,若是需要关联数据库,可在对应的选项打y

修改配置文件 config/autoload/server.php 中的 servers,启用WebSocket服务器

'servers' => [
        [
            'name' => 'ws',
            'type' => Server::SERVER_WEBSOCKET,
            'host' => '0.0.0.0',
            'port' => 9502,
            'sock_type' => SWOOLE_SOCK_TCP,
            'callbacks' => [
                Event::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],
                Event::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],
                Event::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose']
            ]
        ],
    ],

在Hyperf的入口文件 bin/hyperf.php 中创建共享内存对象

use Swoole\Table;

// 创建共享内存实例
$table = new Table(2048);
$table->column('onlineStaff', Table::TYPE_STRING, 1024);
$table->column('customers', Table::TYPE_STRING, 1024);
$table->create();

// 将 Table 实例绑定到容器中,以便在 WebSocket 服务中使用
$container->set(Table::class, $table);

创建一个 WebSocketController 控制器来处理 WebSocket 连接和消息

<?php

namespace App\Controller;

use App\Service\ChatService;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Logger\LoggerFactory;
use Hyperf\WebSocketServer\Sender;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Log\LoggerInterface;
use Swoole\Http\Response;
use Swoole\WebSocket\Server;
use Swoole\WebSocket\Frame;

class WebSocketController implements OnMessageInterface, OnCloseInterface
{

    /**
     * 聊天服务
     * @Inject
     * @var ChatService
     */
    private ChatService $chatService;

    /**
     * 服务容器
     * @var ContainerInterface
     */
    protected ContainerInterface $container;

    /**
     * 日志类
     * @var LoggerInterface
     */
    protected LoggerInterface $logger;

    public function __construct(ContainerInterface $container, LoggerFactory $loggerFactory)
    {
        $this->container = $container;
        $this->chatService = $container->get(ChatService::class);
        $this->logger = $loggerFactory->get('log', 'default');
    }

    /**
     * 消息推送监听
     *
     * @param Response|Server $server
     * @param Frame $frame
     * @return void
     * @throws ContainerExceptionInterface
     * @throws NotFoundExceptionInterface
     */
    public function onMessage($server, $frame): void
    {
        $data = json_decode($frame->data, true);
        $action = $data['action'] ?? null;

        if ($action === 'staffOnline') {
            // 客服上线
            $this->handleStaffOnline($frame->fd, $data['payload']['name'] ?? '客服');
        } elseif ($action === 'customerConnect') {
            // 客户请求建立连接
            $this->handleCustomerConnect($frame->fd);
        } elseif ($action === 'message') {
            // 消息互发
            $message = $data['payload']['message'] ?? '';
            $this->handleMessage($frame->fd, $message);
        }
    }

    /**
     * 对象断开连接回调
     *
     * @param Response|Server $server
     * @param int $fd
     * @param int $reactorId
     * @return void
     * @throws \Psr\Container\ContainerExceptionInterface
     * @throws \Psr\Container\NotFoundExceptionInterface
     */
    public function onClose($server, int $fd, int $reactorId): void
    {
        // 处理客服或者客户断开连接的逻辑
        $this->logger->info('断开连接触发:'. $fd. "|" . $reactorId);
        $customer = $this->chatService->getCustomer($fd);
        if ($customer) {
            $this->chatService->removeCustomer($fd);
        }

        $staff = $this->chatService->getStaff($fd);
        if ($staff) {
            $this->chatService->staffOffline($fd);
        }
    }

    /**
     * 客户连接逻辑处理
     *
     * @param int $fd
     * @return void
     * @throws \Psr\Container\ContainerExceptionInterface
     * @throws \Psr\Container\NotFoundExceptionInterface
     */
    private function handleCustomerConnect(int $fd): void
    {
        // 在此可自行增加参数实现客户身份校验
        $this->chatService->addCustomer($fd);
        $availableData = $this->chatService->findAvailableStaff();
        $this->logger->info('匹配结果:'. json_encode($availableData));
        // selectedStaffFd 为null则说明无在线客服
        if ($availableData['selectedStaffFd'] === null) {
            $this->container->get(Sender::class)->push($fd, json_encode([
                'action' => 'message',
                'payload' => [
                    'message' => '当前暂无客服上班',
                ],
            ]));
            $this->chatService->removeCustomer($fd);
            return;
        }

        // minQueue 不为null则说明需要排队
        if ($availableData['minQueue'] !== null) {
            $this->chatService->setCustomerQueueUp($fd, $availableData['selectedStaffFd']);
            $this->container->get(Sender::class)->push($fd, json_encode([
                'action' => 'message',
                'payload' => [
                    'message' => '当前暂无空闲客服,已为您自动分配排队较少客服,请等待排队',
                    'queue' => $availableData['minQueue'] + 1
                ],
            ]));
            return;
        }

        $this->chatService->setStaffForCustomer($fd, $availableData['selectedStaffFd']);
        $this->chatService->resetCustomerTimer($fd, 60000); // 设置超时为1分钟(60000毫秒)

        $this->container->get(Sender::class)->push($fd, json_encode([
            'action' => 'message',
            'payload' => [
                'message' => '您已连接到客服',
            ],
        ]));
    }

    /**
     * 客服断开连接
     *
     * @param int $fd
     * @param string $name
     * @return void
     */
    private function handleStaffOnline(int $fd, string $name): void
    {
        // 若是数据库的话,可将name替换成客服登录token之类的,实现登录校验
        $this->chatService->staffOnline($fd, $name);
    }

    /**
     * 消息处理逻辑
     *
     * @param int $fd
     * @param string $message
     * @return void
     * @throws \Psr\Container\ContainerExceptionInterface
     * @throws \Psr\Container\NotFoundExceptionInterface
     */
    private function handleMessage(int $fd, string $message): void
    {
        $customer = $this->chatService->getCustomer($fd);
        if ($customer && $customer['staff_fd']) {
            $staff = $this->chatService->getStaff($customer['staff_fd']);
            // 判断是否还在排队
            if ($staff['customer_fd'] != $fd) {
                $this->container->get(Sender::class)->push($fd, json_encode([
                    'action' => 'message',
                    'payload' => [
                        'message' => '当前正在排队中',
                    ],
                ]));
            } else {
                // 如果是客户发来的消息,则转发给客服
                $this->container->get(Sender::class)->push($customer['staff_fd'], json_encode([
                    'action' => 'message',
                    'payload' => [
                        'message' => $message,
                        'from' => 'customer',
                    ],
                ]));

                // 重置客户的超时计时器
                $this->chatService->resetCustomerTimer($fd, 60000);
            }
            // 在此可实现聊天记录留存
            // ...
        } else {
            $staff = $this->chatService->getStaff($fd);
            if ($staff && $staff['customer_fd']) {
                // 如果是客服发来的消息,则转发给客户
                $this->container->get(Sender::class)->push($staff['customer_fd'], json_encode([
                    'action' => 'message',
                    'payload' => [
                        'message' => $message,
                        'from' => 'staff',
                    ],
                ]));
            }
            // 在此可实现聊天记录留存
            // ...
        }
    }

}

创建一个 ChatService 类来处理聊天逻辑

<?php

namespace App\Service;

use Hyperf\Logger\LoggerFactory;
use Hyperf\WebSocketServer\Sender;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Log\LoggerInterface;
use Swoole\Table;

class ChatService
{

    /**
     * 日志类
     * @var LoggerInterface
     */
    private LoggerInterface $logger;

    /**
     * swoole 共享内存
     * @var Table
     */
    private Table $table;

    /**
     * 服务容器
     * @var ContainerInterface
     */
    private ContainerInterface $container;

    private \Swoole\Lock $lock;

    public function __construct(ContainerInterface $container, LoggerFactory $loggerFactory, Table $table)
    {
        // 设置共享锁,用于保护多线程的共享资源读写安全
        $this->lock = new \Swoole\Lock(SWOOLE_MUTEX);
        $this->logger = $loggerFactory->get('log', 'default');
        $this->table = $table;
        $this->container = $container;
    }

    /**
     * 客服上线
     *
     * @param int $fd
     * @param string $name
     * @return void
     */
    public function staffOnline(int $fd, string $name)
    {
        $this->saveData('onlineStaff', $fd, [
            'name' => $name,
            'customer_fd' => null,
            'queue' => []
        ]);
    }

    /**
     * 客服下线
     *
     * @param int $fd
     * @return void
     * @throws \Psr\Container\ContainerExceptionInterface
     * @throws \Psr\Container\NotFoundExceptionInterface
     */
    public function staffOffline(int $fd): void
    {
        $staff = $this->findData('onlineStaff', $fd);
        if ($staff) {
            $this->removeAllCustomer($staff['customer_fd'], $staff['queue']);
            $this->del('onlineStaff', $fd);
        }
    }

    /**
     * 新建客户
     *
     * @param int $fd
     * @return void
     */
    public function addCustomer(int $fd): void
    {
        $this->saveData('customers', $fd, [
            'staff_fd' => null,
            'timeout_timer' => null,
        ]);
        $this->logger->info(json_encode($this->findData('customers')));
    }

    /**
     * 移除客户
     *
     * @param int $fd
     * @return void
     * @throws ContainerExceptionInterface
     * @throws NotFoundExceptionInterface
     */
    public function removeCustomer(int $fd): void
    {
        $customer = $this->findData('customers', $fd);
        if ($customer && $customer['staff_fd']) {
            $this->freeStaff($customer['staff_fd'], $fd);
        }
        if ($customer && $customer['timeout_timer']) {
            // 清除计时器
            swoole_timer_clear($customer['timeout_timer']);
        }
        $this->del('customers', $fd);
    }

    /**
     * 移除所有跟下线客服关联的客户
     *
     * @param int $fd
     * @param array $queue
     * @return void
     * @throws \Psr\Container\ContainerExceptionInterface
     * @throws \Psr\Container\NotFoundExceptionInterface
     */
    public function removeAllCustomer(int $fd, array $queue): void
    {
        $this->container->get(Sender::class)->push($fd, json_encode([
            'action' => 'chat_close',
            'payload' => [
                'message' => '当前客服已下线,聊天结束',
            ],
        ]));
        $this->del('customers', $fd);
        foreach ($queue as $q_fd) {
            $this->container->get(Sender::class)->push($q_fd, json_encode([
                'action' => 'queue_close',
                'payload' => [
                    'message' => '当前客服已下线,是否连接新的客服?',
                ],
            ]));
            $this->del('customers', $q_fd);
        }
    }

    /**
     * 建立聊天通道
     *
     * @param int $customerFd
     * @param int $staffFd
     * @return void
     */
    public function setStaffForCustomer(int $customerFd, int $staffFd): void
    {
        $this->bindStaffToCustomer($customerFd, $staffFd);

        $staff = $this->findData('onlineStaff', $staffFd);
        $staff['customer_fd'] = $customerFd;
        $this->saveData('onlineStaff', $staffFd, $staff);
    }

    /**
     * 客户加入排队
     *
     * @param int $customerFd
     * @param int $staffFd
     * @return void
     */
    public function setCustomerQueueUp(int $customerFd, int $staffFd): void
    {
        $this->bindStaffToCustomer($customerFd, $staffFd);

        $staff = $this->findData('onlineStaff', $staffFd);
        $staff['queue'][] = $customerFd;
        $this->saveData('onlineStaff', $staffFd, $staff);
    }

    /**
     * 聊天结束释放客服,并且更新排队信息
     *
     * @param int $staffFd
     * @param int $fd
     * @return void
     * @throws ContainerExceptionInterface
     * @throws NotFoundExceptionInterface
     */
    public function freeStaff(int $staffFd, int $fd): void
    {
        $staff = $this->findData('onlineStaff', $staffFd);
        if ($fd === $staff['customer_fd']) {
            if (count($staff['queue']) > 0) {
                $staff['customer_fd'] = $staff['queue'][0];

                $this->container->get(Sender::class)->push($staff['customer_fd'], json_encode([
                    'action' => 'message',
                    'payload' => [
                        'message' => '排队结束',
                    ],
                ]));
                $this->resetCustomerTimer($staff['customer_fd'], 60000); // 设置超时为1分钟(60000毫秒)

                $this->container->get(Sender::class)->push($staffFd, json_encode([
                    'action' => 'message',
                    'payload' => [
                        'message' => '新客户已接入,可以开始聊天了',
                    ],
                ]));

                unset($staff['queue'][0]);
                $staff['queue'] = array_values($staff['queue']);
                foreach ($staff['queue'] as $key => $customerFd) {
                    $this->container->get(Sender::class)->push($customerFd, json_encode([
                        'action' => 'message',
                        'payload' => [
                            'message' => '剩余排队人数: ' . ((int)$key + 1),
                        ],
                    ]));
                }
            } else {
                $staff['customer_fd'] = null;
            }
        } else {
            unset($staff['queue'][array_search($fd, $staff['queue'])]);
            $staff['queue'] = array_values($staff['queue']);
            foreach ($staff['queue'] as $key => $customerFd) {
                $this->container->get(Sender::class)->push($customerFd, json_encode([
                    'action' => 'message',
                    'payload' => [
                        'message' => '剩余排队人数: ' . ((int)$key + 1),
                    ],
                ]));
            }
        }

        $this->saveData('onlineStaff', $staffFd, $staff);
    }

    /**
     * 重置计时器
     *
     * @param int $fd
     * @param int $timeout
     * @return void
     */
    public function resetCustomerTimer(int $fd, int $timeout): void
    {
        $customer = $this->findData('customers', $fd);
        if ($customer) {
            if ($customer['timeout_timer']) {
                swoole_timer_clear($customer['timeout_timer']);
            }
            $customer['timeout_timer'] = swoole_timer_after($timeout, function () use ($fd) {
                $this->removeCustomer($fd);
            });
            $this->saveData('customers', $fd, $customer);
        }
    }

    /**
     * 获取可聊天/可排队的客服
     *
     * @return array
     */
    public function findAvailableStaff(): array
    {
        $minQueue = null;
        $selectedStaffFd = null;

        $onlineStaff = $this->findData('onlineStaff');
        $freeStaff = [];
        $queueStaff = [];
        foreach ($onlineStaff as $fd => $staff) {
            $staff['fd'] = $fd;
            // 若有空闲客服直接进入聊天
            if ($staff['customer_fd'] === null && count($staff['queue']) === 0) {
                $freeStaff[] = $staff;
            } else {
                $queueStaff[] = $staff;
            }
        }
        if (count($freeStaff) > 0) {
            $selectedStaffFd = $freeStaff[0]['fd'];
        }

        if (count($freeStaff) === 0 && count($queueStaff) > 0) {
            array_multisort(array_column($queueStaff, 'queue'), SORT_ASC, $queueStaff);
            $selectedStaffFd = $queueStaff[0]['fd'];
            $minQueue = count($queueStaff[0]['queue']);
        }
        return compact('minQueue', 'selectedStaffFd');
    }

    /**
     * 获取客户信息
     *
     * @param int $fd
     * @return array|null
     */
    public function getCustomer(int $fd): ?array
    {
        return $this->findData('customers', $fd);
    }

    /**
     * 获取客服信息
     *
     * @param int $fd
     * @return array|null
     */
    public function getStaff(int $fd): ?array
    {
        return $this->findData('onlineStaff', $fd);
    }

    /**
     * 给客户分配客服
     *
     * @param int $customerFd
     * @param int $staffFd
     * @return void
     */
    private function bindStaffToCustomer (int $customerFd, int $staffFd): void
    {
        $customer = $this->findData('customers', $customerFd);
        $this->logger->info('查询客户数据'. json_encode($customer));
        $customer['staff_fd'] = $staffFd;
        $this->logger->info('修改客户数据'. json_encode($customer));
        $this->saveData('customers', $customerFd, $customer);
    }

    /**
     * 从共享内存查询数据
     *
     * @param string $key
     * @param int|null $id
     * @return mixed|null
     */
    private function findData(string $key, int $id = null): mixed
    {
        $this->lock->lock();
        $data = json_decode($this->table->get('user', $key), true);
        $this->lock->unlock();
        if ($id) {
            return $data[$id] ?? null;
        } else {
            return $data ?? [];
        }
    }

    /**
     * 更新数据到共享内存
     *
     * @param string $key
     * @param int $id
     * @param array $body
     * @return void
     */
    private function saveData(string $key, int $id, array $body): void
    {
        $this->lock->lock();
        $data = json_decode($this->table->get('user', $key), true);
        $data[$id] = $body;
        $this->table->set('user', [$key => json_encode($data)]);
        $this->lock->unlock();
    }

    /**
     * 从共享内存中删除数据
     *
     * @param string $key
     * @param int $id
     * @return void
     */
    private function del(string $key, int $id): void
    {
        $this->lock->lock();
        $data = json_decode($this->table->get('user', $key), true);
        unset($data[$id]);
        $this->table->set('user', [$key => json_encode($data)]);
        $this->lock->unlock();
    }
}

创建路由

Router::addServer('ws', function () {
    Router::get('/', 'App\Controller\WebSocketController');
});

运行项目并且安装命令行请求工具

运行项目

php bin/hyperf.php start

安装webscoat

sudo apt update
sudo apt install curl
sudo apt install cargo
cargo install websocat

发起请求

websocat ws://localhost:9502

内容json

// 客服上线
{"action":"staffOnline","payload":{"name":"客服1"}}
// 客户连接
{"action":"customerConnect","payload":{}}
// 发送消息
{"action":"message","payload":{"message":"一条消息"}}

最终效果

wei xin jie tu 20230402183701 - Hyperf 实现简单的客服聊天系统
wei xin jie tu 20230402183714 - Hyperf 实现简单的客服聊天系统

最后附上demo代码仓库链接:https://github.com/dyjh/chat_demo

  • 分享:
评论
还没有评论
    发表评论 说点什么
    蜀ICP备18035236号