«

PHP 使用Swoole批量异步请求方法

刚子 发布于 阅读:229


读取一条redis的list记录,发送给Swoole协程异步请求处理,最多100个槽位,收到一个响应释放一个槽位。

<?php
Swoole\Runtime::enableCoroutine();

$redis = new Redis();
$redis->connect('REMOTE_REDIS_HOST', 6379);
$redis->auth('REDIS_PASS');

// 并发信号灯:同时最多 100 条 HTTP 在飞
$sem = new Channel(100);

go(function () use ($redis, $sem) {
    while (true) {
        $data = $redis->brPop(['recharge_verification'], 5);
        if (!$data) continue;

        [$queue, $raw] = $data;
        if (strpos($raw, '_') === false) continue;
        [$uniq] = explode('_', $raw, 2);

        // 占一个并发位,满了就阻塞
        $sem->push(true);

        // 启动一个独立协程去请求 & 收响应
        go(function () use ($uniq, $sem) {
            try {
                $cli = new Swoole\Coroutine\Http\Client('target.com', 80);
                $cli->post('/api', ['id' => $uniq]);
                $body = $cli->body;
                // 这里立即写库或写日志
                file_put_contents('response.log',
                    date('Y-m-d H:i:s')." | {$uniq} | {$body}\n",
                    FILE_APPEND | LOCK_EX
                );
            } finally {
                $sem->pop();   // 释放并发位
            }
        });
    }
});

如果请求的是同一个域名,可以使用HTTP/2 多路复用,效率更高。代码如下:

<?php
/**
 * http2_worker.php
 * 依赖:PHP + Swoole ≥ 4.8
 * 运行:php http2_worker.php
 */
declare(strict_types=1);

use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Swoole\Http2\Request;

const CONCURRENCY = 100;          // 同时未完成的请求上限
const REDIS_HOST  = 'REMOTE_HOST';
const REDIS_PORT  = 6379;
const REDIS_PASS  = 'REDIS_PASS';

// ---------- 初始化 ----------
Coroutine\run(function () {
    // 1. 连接远程 Redis
    $redis = new Redis();
    $redis->connect(REDIS_HOST, REDIS_PORT);
    if (REDIS_PASS !== '') $redis->auth(REDIS_PASS);

    // 2. 一条 HTTP/2 长连接(复用)
    $host = 'target.com';               // 你的统一域名
    $cli  = new \Swoole\Coroutine\Http2\Client($host, 443, true);
    if (!$cli->connect()) {
        exit("HTTP/2 connect failed\n");
    }

    // 3. 并发信号量
    $sem = new Channel(CONCURRENCY);

    // 4. 主循环:BRPOP → 发送
    go(function () use ($redis, $cli, $sem, $host) {
        while (true) {
            $data = $redis->brPop(['recharge_verification'], 5);
            if (!$data) continue;                   // 空或超时
            [$queue, $raw] = $data;
            if (strpos($raw, '_') === false) continue;
            [$uniq] = explode('_', $raw, 2);

            $sem->push(true);                       // 占并发位
            go(function () use ($cli, $uniq, $sem) {
                $req = new Request();
                $req->method = 'POST';
                $req->path   = '/api';              // 统一接口
                $req->data   = ['id' => $uniq];
                $streamId = $cli->send($req);

                // 等待这一 stream 的响应
                $resp = $cli->recv($streamId, 5);
                if ($resp && $resp->statusCode === 200) {
                    file_put_contents(
                        __DIR__ . '/response.log',
                        date('Y-m-d H:i:s') . " | {$uniq} | {$resp->data}\n",
                        FILE_APPEND | LOCK_EX
                    );
                }
                $sem->pop();                        // 释放并发位
            });
        }
    });
});