1. 多消费者模式如何保证消息顺序执行

  • 应用场景 : [用户订单更新 创建 ->更新 ->删除]
  • 假设 有3条数据 data1[create] ,data2[update] ,data3[delete] 需要顺序执行
  • 假设 有3个消费者 消费 可能会造成消息顺序错乱的问题 例如 data1->data3->data2 造成数据错乱
  • 示例代码 PHP

1.1.1. 测试数据

// 分别代表 data 1-3

{
  "reqId":"0a7c458c-d619-af31-3ffb-f499995eacd5",
  "user_id":1002, 
  "order_id":2302393013,
  "data":{"status":1},
  "q_time":1563978617
}

{
  "reqId":"6be0c2e3-3514-7d24-0ab6-1e61949a7833",
  "user_id":1002, 
  "order_id":2302393013,
  "data":{"status":2},
  "q_time":1563978618
}

{
  "reqId":"6843cb1b-ebce-ab10-991c-88a55cd7112d",
  "user_id":1002, 
  "order_id":2302393013,
  "data":{"status":2},
  "q_time":1563978619
}

分析 :

  • user_id + order_id 该条消息的用户和订单 唯一值
  • reqId 消息唯一ID
  • q_time 消息时间

1.1.2. 多消费 或者 API-> 组装消息

  • 为了保证消息的顺序执行 和 处理消息的吞吐量
 $reqId    = $data['reqId'] ;
 $order_id = $data['order_id'] ;
 $user_id  = $data['user_id'] ;
 $q_time   = $data['q_time'] ;

 $job_name = "xxx_job";
 $message_key = "{$user_id}_{$order_id}";
 $hash_order_key = "{$job_name}:{$message_key}";

 $redis->hset($hash_order_key,$reqId,json_encode($data)); 
 $redis->zadd($hash_order_key.'_s',$q_time,$reqId);
 // 投递到下级队列
 $res = $redis->setnx("{$job_name}:allow:{$message_key}",1);
 if($res){
          // $message_key  投入下级队列
           $service->xxxJobDistribution($message_key);
 }
 // 可以单独 独立 检测 xxx_job:*的key 批量投递
  • zadd 使用时间戳作为权重值 保证消息的顺序
  • 同个账户的同一个订单 顺序明确
  • 设置 setnx 不存在才会设置成功 保证下级的多消费中只有一次

1.1.3. 下级队列 xxxjob 消费

 $job_name = "xxx_job";
 $hash_order_key = "{$job_name}:{$message_key}";

 $reqIds = $redis->zrange($hash_order_key.'_s',0,-1);
 $MsgLists = $redis->hmget($hash_order_key,$reqIds); 

 foreach($MsgLists as $k => $val){
  //do any ...

    $redis->hdel($hash_order_key,$reqIds[$k]); //true or false
    $redis->zrem("{$hash_order_key}_s",$reqIds[$k]);
 }
 $redis->del(["{$job_name}:allow:{$message_key}"]);
  • 消费完成后删除锁
  • 业务注意 try catch 数据回滚

1.1.4. DEMO

<?php
/**
 * Created by PhpStorm.
 * User: alonexy
 * Date: 19/7/25
 * Time: 18:23
 */

namespace Services;


use App\Common\Functions;

class JobSequenceService
{
    public $redis;
    public $prifix;

    public function __construct($redis, $prifix = 'job_sequence:')
    {
        $this->redis  = $redis;
        $this->prifix = $prifix;

    }

    private $HandleFunc = null;

    /**
     * 数据分组顺序拼接数据
     * @param array $jobData 任务数组数据
     * @param $reqIdKey 消息唯一ID key
     * @param $scoreKey  消息权重 key
     * @param array $groupKeys 消息 分组Key array
     * @param $jobName 使用的job
     * @return array
     */
    public function DataGroupJobSplicing(array $jobData, $reqIdKey, $scoreKey, array $groupKeys, $jobName)
    {
        try {
            $reqId      = $jobData[$reqIdKey];
            $score      = $jobData[$scoreKey];
            $unqiueArrs = [];
            foreach ($groupKeys as $gk) {
                $unqiueArrs[] = $jobData[$gk];
            }
            $message_key    = implode('_', $unqiueArrs);
            $hash_order_key = $this->prifix."{$jobName}:{$message_key}";
            $res            = $this->addJobData($hash_order_key, $reqId, $jobData, $score);
            return [true, $message_key];
        }
        catch (\RedisException $e) {
            return [false, $e->getMessage()];
        }
    }

    /**
     * 添加任务分组数据
     * @param $hash_order_key
     * @param $reqId
     * @param $jobData
     * @param $score
     * @return null
     */
    private function addJobData($hash_order_key, $reqId, $jobData, $score)
    {
        $options = array(
            'cas' => true,
            'retry' => 2,
        );
        $this->redis->transaction(
            $options, function ($tx) use ($hash_order_key, $reqId, $jobData, $score) {
            $tx->multi();   // With CAS, MULTI *must* be explicitly invoked.
            $tx->hset($hash_order_key, $reqId, json_encode($jobData));
            $tx->zadd($hash_order_key . '_s', $score, $reqId);
        });
        return $this->redis->zcard($hash_order_key . '_s');
    }

    private function delJobData($hash_order_key, $value)
    {
        $options = array(
            'cas' => true,
            'retry' => 2,
        );
        $res     = $this->redis->transaction(
            $options, function ($tx) use ($hash_order_key, $value) {
            $tx->multi();   // With CAS, MULTI *must* be explicitly invoked.
            $tx->hdel($hash_order_key, $value);
            $tx->zrem("{$hash_order_key}_s", $value);
        });
        return $res;
    }

    /**
     * 设置数据处理方法
     * @param $function
     */
    public function SetGroupDataHandleFun($function)
    {
        $this->HandleFunc = $function;
    }

    /**
     * 分组数据批量处理
     * @param $jobName
     * @param $messageKey
     * @return array
     * @throws \Exception
     */
    public function GroupDatasHandle($jobName, $messageKey)
    {
        $hash_order_key = $this->prifix."{$jobName}:{$messageKey}";

        $reqIds = $this->redis->zrange($hash_order_key . '_s', 0, -1);
        if (empty($reqIds)) {
            return [];
        }
        $MsgLists = $this->redis->hmget($hash_order_key, $reqIds);
        if (is_null($this->HandleFunc)) {
            throw new \Exception("SetHandleFun is nil");
        }
        foreach ($MsgLists as $k => $val) {
            try {
                call_user_func_array($this->HandleFunc, array(&$val));
            }
            catch (\Exception $e) {
                throw new \Exception($e->getMessage());
            }
            $this->delJobData($hash_order_key, $reqIds[$k]);
        }
        $this->unlock($jobName, $messageKey);
    }

    /**
     * 获取job下分组数据key
     * @param $jobName
     * @return array
     */
    public function getJobGroupKeys($jobName)
    {
        $keys        = "$jobName:*";
        $ks          = $this->redis->keys($this->prifix.$keys);

        $msssageKeys = [];
        foreach ($ks as $k) {
            preg_match_all('/' . $this->prifix.$jobName . ':(.*)_s/', $k, $ma);
            if (isset($ma[1][0])) {
                $mKey = $ma[1][0];
                if (!$this->is_lock($jobName, $mKey)) {
                    $msssageKeys[] = $mKey;
                }
            }
        }
        return $msssageKeys;
    }

    /**
     * 获取是否存在锁
     * @param $jobName
     * @param $messageKey
     * @return mixed
     */
    public function is_lock($jobName, $messageKey)
    {
        return $this->redis->exists($this->prifix."{$jobName}:lock:{$messageKey}");
    }

    /**
     * 任务处理时 锁
     * @param $jobName
     * @param $messageKey
     * @return mixed
     */
    public function lock($jobName, $messageKey)
    {
        $res = $this->redis->setnx($this->prifix."{$jobName}:lock:{$messageKey}", 1);
        if($res){
            $this->redis->expire($this->prifix."{$jobName}:lock:{$messageKey}",3600);
        }
        return $res;
    }

    /**
     * 处理完成后删除 锁
     * @param $jobName
     * @param $messageKey
     * @return mixed
     */
    public function unlock($jobName, $messageKey)
    {
         return $this->redis->del([$this->prifix."{$jobName}:lock:{$messageKey}"]);
    }

    /**
     * 获取请求ID
     * @return string
     */
    public function getReqId()
    {
        $date = date('Y-m-d');
        return $this->redis->incr($this->prifix."job_req_id:{$date}") . '_' . Functions::uuids();
    }
}

1.1.5. 使用

       $jobData1 = [
            'reqId'=>'0a7c458c-d619-af31-3ffb-f499995eacd5',
            'user_id'=>'1002',
            'order_id'=>'232323',
            'data'=>[
                'status'=>1
            ],
            'reqTime'=>1563978617
        ];
        $jobData2 = [
            'reqId'=>'000001-d619-af31-3ffb-f499995eacd5',
            'user_id'=>'1002',
            'order_id'=>'232323',
            'data'=>[
                'status'=>2
            ],
            'reqTime'=>1563978618
        ];
        $jobData3 = [
            'reqId'=>'000002-d619-af31-3ffb-f499995eacd5',
            'user_id'=>'1002',
            'order_id'=>'232323',
            'data'=>[
                'status'=>3
            ],
            'reqTime'=>1563978619
        ];
        $redis = \RedisDB::connection('default');
        $service = new JobSequenceService($redis);
        // 插入数据
       $service->DataGroupJobSplicing($jobData1,'reqId','reqTime',['user_id','order_id'],'xxxjob');
       $service->DataGroupJobSplicing($jobData3,'reqId','reqTime',['user_id','order_id'],'xxxjob');
       $service->DataGroupJobSplicing($jobData2,'reqId','reqTime',['user_id','order_id'],'xxxjob');

       //消费数据
       $service->lock('xxxjob','1002_232323'); //lock;
       $service->SetGroupDataHandleFun(function($data){
          try{
//               dump($data);
              $orderData = \GuzzleHttp\json_decode($data,1);
              unset($orderData['reqId']);
              unset($orderData['reqTime']);
              dump($orderData);
              sleep(10);
//               throw new \Exception("test");
          }catch (\Exception $e){
              //rollback data
              throw new \Exception($e->getMessage());
          }
       });
       dump($service->GroupDatasHandle('xxxjob','1002_232323'));

       dd($service->getJobGroupKeys('xxxjob'));

results matching ""

    No results matching ""