Hyperf 实现简单的客服聊天系统
“终于不用再花钱买聊天软件啦”
最近在试着使用一些swoole框架,于是突发奇想写个客服聊天的小Demo
本项目使用Hyperf框架、WebSocket和Swoole共享内存实现了如下功能。
- 若当前有客服在线并且空闲,直接进入聊天
- 一名客服同时只能和一个客户聊天
- 客户若是超时未回复消息,自动退出聊天
- 若当前有客服在线并且都在聊天中,则分配一个排队人数最少的客服并且进入排队
- 若是当前无客服在线,直接返回'当前暂无客服上班',不进入排队
- 客户池以及客服池存入多线程共享内存,并且使用共享资源锁保
SWOOLE_MUTEX
保证多线程数据安全
以下是一个基本的代码实现,朋友们可以根据需要进行调整。例如使用mysql增加客服客户身份校验,以及聊天消息留存,这些在这里就不做示范。
安装Hyperf框架和依赖组件:
composer create-project hyperf/hyperf-skeleton chat_demo
cd chat_demo
composer require hyperf/websocket-server
这里需要注意的是,不分国内composer源并未更新库(如阿里源),所以无法安装最新的Hyperf,在这里我更换为华为源,这样默认安装的Hyperf是3.0版本
composer config --global repo.packagist composer https://mirrors.huaweicloud.com/repository/php/
Hyperf 安装选项一路回车即可,若是需要关联数据库,可在对应的选项打y
修改配置文件 config/autoload/server.php
中的 servers
,启用WebSocket服务器
'servers' => [
[
'name' => 'ws',
'type' => Server::SERVER_WEBSOCKET,
'host' => '0.0.0.0',
'port' => 9502,
'sock_type' => SWOOLE_SOCK_TCP,
'callbacks' => [
Event::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],
Event::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],
Event::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose']
]
],
],
在Hyperf的入口文件 bin/hyperf.php
中创建共享内存对象
use Swoole\Table;
// 创建共享内存实例
$table = new Table(2048);
$table->column('onlineStaff', Table::TYPE_STRING, 1024);
$table->column('customers', Table::TYPE_STRING, 1024);
$table->create();
// 将 Table 实例绑定到容器中,以便在 WebSocket 服务中使用
$container->set(Table::class, $table);
创建一个 WebSocketController
控制器来处理 WebSocket 连接和消息
<?php
namespace App\Controller;
use App\Service\ChatService;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Logger\LoggerFactory;
use Hyperf\WebSocketServer\Sender;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Log\LoggerInterface;
use Swoole\Http\Response;
use Swoole\WebSocket\Server;
use Swoole\WebSocket\Frame;
class WebSocketController implements OnMessageInterface, OnCloseInterface
{
/**
* 聊天服务
* @Inject
* @var ChatService
*/
private ChatService $chatService;
/**
* 服务容器
* @var ContainerInterface
*/
protected ContainerInterface $container;
/**
* 日志类
* @var LoggerInterface
*/
protected LoggerInterface $logger;
public function __construct(ContainerInterface $container, LoggerFactory $loggerFactory)
{
$this->container = $container;
$this->chatService = $container->get(ChatService::class);
$this->logger = $loggerFactory->get('log', 'default');
}
/**
* 消息推送监听
*
* @param Response|Server $server
* @param Frame $frame
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function onMessage($server, $frame): void
{
$data = json_decode($frame->data, true);
$action = $data['action'] ?? null;
if ($action === 'staffOnline') {
// 客服上线
$this->handleStaffOnline($frame->fd, $data['payload']['name'] ?? '客服');
} elseif ($action === 'customerConnect') {
// 客户请求建立连接
$this->handleCustomerConnect($frame->fd);
} elseif ($action === 'message') {
// 消息互发
$message = $data['payload']['message'] ?? '';
$this->handleMessage($frame->fd, $message);
}
}
/**
* 对象断开连接回调
*
* @param Response|Server $server
* @param int $fd
* @param int $reactorId
* @return void
* @throws \Psr\Container\ContainerExceptionInterface
* @throws \Psr\Container\NotFoundExceptionInterface
*/
public function onClose($server, int $fd, int $reactorId): void
{
// 处理客服或者客户断开连接的逻辑
$this->logger->info('断开连接触发:'. $fd. "|" . $reactorId);
$customer = $this->chatService->getCustomer($fd);
if ($customer) {
$this->chatService->removeCustomer($fd);
}
$staff = $this->chatService->getStaff($fd);
if ($staff) {
$this->chatService->staffOffline($fd);
}
}
/**
* 客户连接逻辑处理
*
* @param int $fd
* @return void
* @throws \Psr\Container\ContainerExceptionInterface
* @throws \Psr\Container\NotFoundExceptionInterface
*/
private function handleCustomerConnect(int $fd): void
{
// 在此可自行增加参数实现客户身份校验
$this->chatService->addCustomer($fd);
$availableData = $this->chatService->findAvailableStaff();
$this->logger->info('匹配结果:'. json_encode($availableData));
// selectedStaffFd 为null则说明无在线客服
if ($availableData['selectedStaffFd'] === null) {
$this->container->get(Sender::class)->push($fd, json_encode([
'action' => 'message',
'payload' => [
'message' => '当前暂无客服上班',
],
]));
$this->chatService->removeCustomer($fd);
return;
}
// minQueue 不为null则说明需要排队
if ($availableData['minQueue'] !== null) {
$this->chatService->setCustomerQueueUp($fd, $availableData['selectedStaffFd']);
$this->container->get(Sender::class)->push($fd, json_encode([
'action' => 'message',
'payload' => [
'message' => '当前暂无空闲客服,已为您自动分配排队较少客服,请等待排队',
'queue' => $availableData['minQueue'] + 1
],
]));
return;
}
$this->chatService->setStaffForCustomer($fd, $availableData['selectedStaffFd']);
$this->chatService->resetCustomerTimer($fd, 60000); // 设置超时为1分钟(60000毫秒)
$this->container->get(Sender::class)->push($fd, json_encode([
'action' => 'message',
'payload' => [
'message' => '您已连接到客服',
],
]));
}
/**
* 客服断开连接
*
* @param int $fd
* @param string $name
* @return void
*/
private function handleStaffOnline(int $fd, string $name): void
{
// 若是数据库的话,可将name替换成客服登录token之类的,实现登录校验
$this->chatService->staffOnline($fd, $name);
}
/**
* 消息处理逻辑
*
* @param int $fd
* @param string $message
* @return void
* @throws \Psr\Container\ContainerExceptionInterface
* @throws \Psr\Container\NotFoundExceptionInterface
*/
private function handleMessage(int $fd, string $message): void
{
$customer = $this->chatService->getCustomer($fd);
if ($customer && $customer['staff_fd']) {
$staff = $this->chatService->getStaff($customer['staff_fd']);
// 判断是否还在排队
if ($staff['customer_fd'] != $fd) {
$this->container->get(Sender::class)->push($fd, json_encode([
'action' => 'message',
'payload' => [
'message' => '当前正在排队中',
],
]));
} else {
// 如果是客户发来的消息,则转发给客服
$this->container->get(Sender::class)->push($customer['staff_fd'], json_encode([
'action' => 'message',
'payload' => [
'message' => $message,
'from' => 'customer',
],
]));
// 重置客户的超时计时器
$this->chatService->resetCustomerTimer($fd, 60000);
}
// 在此可实现聊天记录留存
// ...
} else {
$staff = $this->chatService->getStaff($fd);
if ($staff && $staff['customer_fd']) {
// 如果是客服发来的消息,则转发给客户
$this->container->get(Sender::class)->push($staff['customer_fd'], json_encode([
'action' => 'message',
'payload' => [
'message' => $message,
'from' => 'staff',
],
]));
}
// 在此可实现聊天记录留存
// ...
}
}
}
创建一个 ChatService
类来处理聊天逻辑
<?php
namespace App\Service;
use Hyperf\Logger\LoggerFactory;
use Hyperf\WebSocketServer\Sender;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Log\LoggerInterface;
use Swoole\Table;
class ChatService
{
/**
* 日志类
* @var LoggerInterface
*/
private LoggerInterface $logger;
/**
* swoole 共享内存
* @var Table
*/
private Table $table;
/**
* 服务容器
* @var ContainerInterface
*/
private ContainerInterface $container;
private \Swoole\Lock $lock;
public function __construct(ContainerInterface $container, LoggerFactory $loggerFactory, Table $table)
{
// 设置共享锁,用于保护多线程的共享资源读写安全
$this->lock = new \Swoole\Lock(SWOOLE_MUTEX);
$this->logger = $loggerFactory->get('log', 'default');
$this->table = $table;
$this->container = $container;
}
/**
* 客服上线
*
* @param int $fd
* @param string $name
* @return void
*/
public function staffOnline(int $fd, string $name)
{
$this->saveData('onlineStaff', $fd, [
'name' => $name,
'customer_fd' => null,
'queue' => []
]);
}
/**
* 客服下线
*
* @param int $fd
* @return void
* @throws \Psr\Container\ContainerExceptionInterface
* @throws \Psr\Container\NotFoundExceptionInterface
*/
public function staffOffline(int $fd): void
{
$staff = $this->findData('onlineStaff', $fd);
if ($staff) {
$this->removeAllCustomer($staff['customer_fd'], $staff['queue']);
$this->del('onlineStaff', $fd);
}
}
/**
* 新建客户
*
* @param int $fd
* @return void
*/
public function addCustomer(int $fd): void
{
$this->saveData('customers', $fd, [
'staff_fd' => null,
'timeout_timer' => null,
]);
$this->logger->info(json_encode($this->findData('customers')));
}
/**
* 移除客户
*
* @param int $fd
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function removeCustomer(int $fd): void
{
$customer = $this->findData('customers', $fd);
if ($customer && $customer['staff_fd']) {
$this->freeStaff($customer['staff_fd'], $fd);
}
if ($customer && $customer['timeout_timer']) {
// 清除计时器
swoole_timer_clear($customer['timeout_timer']);
}
$this->del('customers', $fd);
}
/**
* 移除所有跟下线客服关联的客户
*
* @param int $fd
* @param array $queue
* @return void
* @throws \Psr\Container\ContainerExceptionInterface
* @throws \Psr\Container\NotFoundExceptionInterface
*/
public function removeAllCustomer(int $fd, array $queue): void
{
$this->container->get(Sender::class)->push($fd, json_encode([
'action' => 'chat_close',
'payload' => [
'message' => '当前客服已下线,聊天结束',
],
]));
$this->del('customers', $fd);
foreach ($queue as $q_fd) {
$this->container->get(Sender::class)->push($q_fd, json_encode([
'action' => 'queue_close',
'payload' => [
'message' => '当前客服已下线,是否连接新的客服?',
],
]));
$this->del('customers', $q_fd);
}
}
/**
* 建立聊天通道
*
* @param int $customerFd
* @param int $staffFd
* @return void
*/
public function setStaffForCustomer(int $customerFd, int $staffFd): void
{
$this->bindStaffToCustomer($customerFd, $staffFd);
$staff = $this->findData('onlineStaff', $staffFd);
$staff['customer_fd'] = $customerFd;
$this->saveData('onlineStaff', $staffFd, $staff);
}
/**
* 客户加入排队
*
* @param int $customerFd
* @param int $staffFd
* @return void
*/
public function setCustomerQueueUp(int $customerFd, int $staffFd): void
{
$this->bindStaffToCustomer($customerFd, $staffFd);
$staff = $this->findData('onlineStaff', $staffFd);
$staff['queue'][] = $customerFd;
$this->saveData('onlineStaff', $staffFd, $staff);
}
/**
* 聊天结束释放客服,并且更新排队信息
*
* @param int $staffFd
* @param int $fd
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function freeStaff(int $staffFd, int $fd): void
{
$staff = $this->findData('onlineStaff', $staffFd);
if ($fd === $staff['customer_fd']) {
if (count($staff['queue']) > 0) {
$staff['customer_fd'] = $staff['queue'][0];
$this->container->get(Sender::class)->push($staff['customer_fd'], json_encode([
'action' => 'message',
'payload' => [
'message' => '排队结束',
],
]));
$this->resetCustomerTimer($staff['customer_fd'], 60000); // 设置超时为1分钟(60000毫秒)
$this->container->get(Sender::class)->push($staffFd, json_encode([
'action' => 'message',
'payload' => [
'message' => '新客户已接入,可以开始聊天了',
],
]));
unset($staff['queue'][0]);
$staff['queue'] = array_values($staff['queue']);
foreach ($staff['queue'] as $key => $customerFd) {
$this->container->get(Sender::class)->push($customerFd, json_encode([
'action' => 'message',
'payload' => [
'message' => '剩余排队人数: ' . ((int)$key + 1),
],
]));
}
} else {
$staff['customer_fd'] = null;
}
} else {
unset($staff['queue'][array_search($fd, $staff['queue'])]);
$staff['queue'] = array_values($staff['queue']);
foreach ($staff['queue'] as $key => $customerFd) {
$this->container->get(Sender::class)->push($customerFd, json_encode([
'action' => 'message',
'payload' => [
'message' => '剩余排队人数: ' . ((int)$key + 1),
],
]));
}
}
$this->saveData('onlineStaff', $staffFd, $staff);
}
/**
* 重置计时器
*
* @param int $fd
* @param int $timeout
* @return void
*/
public function resetCustomerTimer(int $fd, int $timeout): void
{
$customer = $this->findData('customers', $fd);
if ($customer) {
if ($customer['timeout_timer']) {
swoole_timer_clear($customer['timeout_timer']);
}
$customer['timeout_timer'] = swoole_timer_after($timeout, function () use ($fd) {
$this->removeCustomer($fd);
});
$this->saveData('customers', $fd, $customer);
}
}
/**
* 获取可聊天/可排队的客服
*
* @return array
*/
public function findAvailableStaff(): array
{
$minQueue = null;
$selectedStaffFd = null;
$onlineStaff = $this->findData('onlineStaff');
$freeStaff = [];
$queueStaff = [];
foreach ($onlineStaff as $fd => $staff) {
$staff['fd'] = $fd;
// 若有空闲客服直接进入聊天
if ($staff['customer_fd'] === null && count($staff['queue']) === 0) {
$freeStaff[] = $staff;
} else {
$queueStaff[] = $staff;
}
}
if (count($freeStaff) > 0) {
$selectedStaffFd = $freeStaff[0]['fd'];
}
if (count($freeStaff) === 0 && count($queueStaff) > 0) {
array_multisort(array_column($queueStaff, 'queue'), SORT_ASC, $queueStaff);
$selectedStaffFd = $queueStaff[0]['fd'];
$minQueue = count($queueStaff[0]['queue']);
}
return compact('minQueue', 'selectedStaffFd');
}
/**
* 获取客户信息
*
* @param int $fd
* @return array|null
*/
public function getCustomer(int $fd): ?array
{
return $this->findData('customers', $fd);
}
/**
* 获取客服信息
*
* @param int $fd
* @return array|null
*/
public function getStaff(int $fd): ?array
{
return $this->findData('onlineStaff', $fd);
}
/**
* 给客户分配客服
*
* @param int $customerFd
* @param int $staffFd
* @return void
*/
private function bindStaffToCustomer (int $customerFd, int $staffFd): void
{
$customer = $this->findData('customers', $customerFd);
$this->logger->info('查询客户数据'. json_encode($customer));
$customer['staff_fd'] = $staffFd;
$this->logger->info('修改客户数据'. json_encode($customer));
$this->saveData('customers', $customerFd, $customer);
}
/**
* 从共享内存查询数据
*
* @param string $key
* @param int|null $id
* @return mixed|null
*/
private function findData(string $key, int $id = null): mixed
{
$this->lock->lock();
$data = json_decode($this->table->get('user', $key), true);
$this->lock->unlock();
if ($id) {
return $data[$id] ?? null;
} else {
return $data ?? [];
}
}
/**
* 更新数据到共享内存
*
* @param string $key
* @param int $id
* @param array $body
* @return void
*/
private function saveData(string $key, int $id, array $body): void
{
$this->lock->lock();
$data = json_decode($this->table->get('user', $key), true);
$data[$id] = $body;
$this->table->set('user', [$key => json_encode($data)]);
$this->lock->unlock();
}
/**
* 从共享内存中删除数据
*
* @param string $key
* @param int $id
* @return void
*/
private function del(string $key, int $id): void
{
$this->lock->lock();
$data = json_decode($this->table->get('user', $key), true);
unset($data[$id]);
$this->table->set('user', [$key => json_encode($data)]);
$this->lock->unlock();
}
}
创建路由
Router::addServer('ws', function () {
Router::get('/', 'App\Controller\WebSocketController');
});
运行项目并且安装命令行请求工具
运行项目
php bin/hyperf.php start
安装webscoat
sudo apt update
sudo apt install curl
sudo apt install cargo
cargo install websocat
发起请求
websocat ws://localhost:9502
内容json
// 客服上线
{"action":"staffOnline","payload":{"name":"客服1"}}
// 客户连接
{"action":"customerConnect","payload":{}}
// 发送消息
{"action":"message","payload":{"message":"一条消息"}}
最终效果
最后附上demo代码仓库链接:https://github.com/dyjh/chat_demo