1. centrifugo
文档地址 :https://centrifugal.github.io/centrifugo/
Centrifugo 是一种与语言无关的实时消息服务器。语言无关意味着您的应用程序在前端或后端使用哪种编程语言无关紧要 - Centrifugo 可以与任何一种语言协同工作。
实时消息是在事件发生后几乎立即传递给应用程序用户的消息 - 想想实时评论,聊天,实时图表,动态计数器和游戏。
Centrifugo 现在支持几种实时消息传输:
带有 JSON 或二进制 Protobuf 协议的 Websocket SockJS - 尝试首先建立 Websocket 连接的库,然后在 Websocket 连接出现问题时自动回退到 HTTP 传输(服务器发送事件,XHR 流,XHR 轮询等)
项目的动机¶ Centrifugo 最初诞生的目的是帮助用语言或框架编写的应用程序,而不需要内置的 concurency 支持来引入实时更新。例如,Django,Flask,Yii,Laravel,Ruby on Rails 等框架对使用许多持久连接的支持很差。Centrifugo 旨在帮助解决这个问题,并继续用您最喜欢的语言和最喜欢的框架编写后端。它还具有一些功能(性能,可伸缩性,连接管理,重新连接时的消息恢复等),即使您使用异步并发语言编写后端,也可以简化您作为开发人员的生活。
概念¶ Centrifugo 作为独立服务器运行,负责处理来自应用程序用户的持久连接。您的应用程序后端和前端可以用任何编程语言编写。您的客户端使用应用程序后端提供的连接令牌(JWT)从前端连接 Centrifugo 并订阅频道。一旦发生某些事件,您的应用程序后端就可以使用 Centrifugo API 将包含事件的消息发布到通道中。该消息将发送给当前在频道上订阅的所有客户。实际上 Centrifugo 是一个面向用户的 PUB / SUB 服务器。
2. 安装
支持
- RPM && DEB => https://packagecloud.io/FZambia/centrifugo
- Docker => https://hub.docker.com/r/centrifugo/centrifugo/
- 源码构建 => https://github.com/centrifugal/centrifugo
- 二进制包安装 => https://github.com/centrifugal/centrifugo/releases
2.1. 安装方式 (源码构建)
git clone https://github.com/centrifugal/centrifugo.git
cd centrifugo
2.2. 环境
go version go1.11.4 darwin/amd64
2.3. go mod
go mod init
// go mod init (name) ## name 自己定义 这里使用空
2.4. 下载依赖
go mod vendor
go mod download
过程中需要* F *墙*
2.5. go build
生成 二进制文件
# https://centrifugal.github.io/centrifugo/server/install/
2.6. 配置文件
# https://centrifugal.github.io/centrifugo/server/configuration/
{
"admin":true, # 开启web
"secret": "secret", # api 生成token
"api_key":"api_key", # api 使用授权的apikey
"admin_password": "web密码",
"admin_secret": "admin->secret",
"client_channel_limit":2, #设置单个客户端可以拥有的最大不同通道订阅数。
"channel_max_length":255, #设置通道名称的最大长度。 默认255
"client_anonymous":true, #匿名(具有空用户ID),只能订阅anonymous启用了选项的通道。
"namespaces": [ ## 命名空间
{
"name": "abbb",
"anonymous": false,
"publish": true,
"join_leave": true,
"presence": true,
"presence_stats": true,
"history_size": 0,
"history_lifetime": 0,
"history_recover": false
},
{
"name": "public", # channel 使用 【public:test】 那么使用 此配置项
"anonymous": true, # 支持 匿名 【空用户ID ->生产的token】
"publish": false, # 客户端 是否支持 发送消息
"join_leave": false, # 启用/禁用发送加入(离开)消息。默认情况下false
"presence": true, # 支持 频道的状态信息
"presence_stats": true, # 统计客户端信息
# 历史配置一起使用
"history_size": 10, #支持历史消息 条数
"history_lifetime": 60, #支持历史消息限制时间 *秒
"history_recover": true # 客户端 意外断开 历史数据恢复
}
]
}
2.7. Nginx 配置
upstream centrifugo {
ip_hash;
server 127.0.0.1:8000;
}
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
server_name 0.0.0.0;
listen 8086;
#listen 443;
#ssl on;
#ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
#ssl_ciphers AES128-SHA:AES256-SHA:RC4-SHA:DES-CBC3-SHA:RC4-MD5;
#ssl_certificate /etc/nginx/ssl/wildcard.example.com.crt;
#ssl_certificate_key /etc/nginx/ssl/wildcard.example.com.key;
#ssl_session_cache shared:SSL:10m;ssl_session_timeout 10m;
default_type application/octet-stream;
sendfile on;
tcp_nopush on;
tcp_nodelay on;
gzip on;
gzip_min_length 1000;
gzip_proxied any;
# Only retry if there was a communication error, not a timeout
# on the Tornado server (to avoid propagating "queries of death"
# to all frontends)
proxy_next_upstream error;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Scheme $scheme;
proxy_set_header Host $http_host;
location /connection {
proxy_pass http://centrifugo;
proxy_buffering off;
keepalive_timeout 65;
proxy_read_timeout 60s;
proxy_http_version 1.1;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Scheme $scheme;
proxy_set_header Host $http_host;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
}
location / {
if ($request_method = 'OPTIONS') {
add_header Access-Control-Allow-Origin $http_origin;
add_header Access-Control-Allow-Methods $http_access_control_request_method;
add_header Access-Control-Allow-Credentials true;
add_header Access-Control-Allow-Headers $http_access_control_request_headers;
add_header Access-Control-Max-Age 1728000;
return 204;
}
proxy_pass http://centrifugo;
}
}
2.8. PHP Server Api
composer require sl4mmer/phpcent
https://github.com/centrifugal/phpcent
https://github.com/oleh-ozimok/php-centrifugo/tree/v1.0
https://github.com/oleh-ozimok/php-centrifugo
2.9. 服务封装
<?php
namespace App\Services;
use App\Common\Functions;
use Illuminate\Foundation\Bus\DispatchesJobs;
/**
* 离心机推送websocket
* Class BlackHoleService
* @package App\Services
*/
class BlackHoleService
{
//DispatchesJobs
use DispatchesJobs;
private $Centrifugo;
public function __construct(){
$this->Centrifugo = new \phpcent\Client(env('CENT_URL',"http://host:8000/api"),env('CENT_API_KEY','apikey'),env('CENT_SECRET','secret'));
}
/**
* 推送数据
* @param $channel
* @param $data
* @return array
*/
public function pubData($channel,$data)
{
try{
if(empty($channel)){
throw new \Exception("channel is nil.");
}
if(empty($data) ){
throw new \Exception("data is nil.");
}elseif(!is_array($data)){
throw new \Exception("data Must be an array .");
}
$res = $this->Centrifugo->publish("{$channel}", $data);
Functions::SysLog('BlackHoleService','== pubData SUC ==',[$channel,$data,$res]);
return [true,$res];
}catch (\Exception $e){
Functions::SysLog('BlackHoleService','== pubData Exception ==',[[$channel,$data],$e->getMessage(),$e->getLine(),$e->getCode()]);
return [false,$e->getMessage()];
}
}
/**
* 发送广播 $channels 空格 间隔
* @param $channels
* @param $data
* @return array
*/
public function pubBroadcast($channels, $data)
{
try{
if(empty($channel)){
throw new \Exception("channels is nil.");
}
if(empty($data) ){
throw new \Exception("data is nil.");
}elseif(!is_array($data)){
throw new \Exception("data Must be an array .");
}
$res = $this->Centrifugo->broadcast($channels, $data);
Functions::SysLog('BlackHoleService','== pubBroadcast SUC ==',[$channels,$data,$res]);
return [true,$res];
}catch (\Exception $e){
Functions::SysLog('BlackHoleService','== pubBroadcast Exception ==',[$e->getMessage(),$e->getLine(),$e->getCode()]);
return [false,$e->getMessage()];
}
}
/**
* 获取授权token 默认 userId 空支持 anonymous
* @param string $userId
* @param int $exp
* @param array $info
* @return array
*/
public function getToken($userId='',$exp=0,$info=[])
{
try{
$token = $this->Centrifugo->generateConnectionToken($userId,$exp,$info);
return [true,$token];
}catch (\Exception $e){
Functions::SysLog('BlackHoleService','== GetToken Exception ==',[[$userId,$exp,$info],$e->getMessage(),$e->getLine(),$e->getCode()]);
return [false,$e->getMessage()];
}
}
/**
* 获取通道
* @return mixed
*/
public function getChannels(){
try{
$response = $this->Centrifugo->channels();
return [true,Functions::object_array($response->result->channels)];
}catch (\Exception $e){
Functions::SysLog('BlackHoleService','== getChannels Exception ==',[$e->getMessage(),$e->getLine(),$e->getCode()]);
return [false,$e->getMessage()];
}
}
/**
* 获取通道下的历史数据
* @param $channel
* @return array
*/
public function getHistory($channel){
try{
$response = $this->Centrifugo->history($channel);
return [true,Functions::object_array($response->result->publications)];
}catch (\Exception $e){
Functions::SysLog('BlackHoleService','== getHistory Exception ==',[$e->getMessage(),$e->getLine(),$e->getCode()]);
return [false,$e->getMessage()];
}
}
/**
* 获取通道下的客户端
* @param $channel
* @return array
*/
public function getClientsByChannel($channel){
try{
$response = $this->Centrifugo->presence($channel);
return [true,Functions::object_array($response->result->presence)];
}catch (\Exception $e){
Functions::SysLog('BlackHoleService','== getClientsByChannel Exception ==',[$e->getMessage(),$e->getLine(),$e->getCode()]);
return [false,$e->getMessage()];
}
}
/**
* 获取通道的统计信息
* @param $channel
* @return array
*/
public function getStatsByChannel($channel){
try{
$response = $this->Centrifugo->presence_stats($channel);
return [true,Functions::object_array($response->result)];
}catch (\Exception $e){
Functions::SysLog('BlackHoleService','== getStatsByChannel Exception ==',[$e->getMessage(),$e->getLine(),$e->getCode()]);
return [false,$e->getMessage()];
}
}
}