«

PHP 使用Swoole批量异步请求 异步获取结果的方法

刚子 发布于 阅读:109


这个脚本会:
检查同目录下是否存在 url.txt 文件。
如果文件不存在,暂停一秒后重新读取。
如果文件存在,读取所有订单号,删除文件,并拼接到指定 URL 后提交异步 GET 请求。
并发控制为 100 个请求。
所有请求提交后,主协程立即返回,不等待所有请求完成。
异步获取请求结果,并将结果追加写入日志文件,内容包含自定义请求 ID 和返回原始内容。
完成后重新触发循环,重新读取 url.txt。
实测请求1000个URL,9秒后全部URL请求完毕,14秒后全部结果返回完毕。

<?php
use Swoole\Coroutine\Http\Client;
use Swoole\Coroutine\Channel;
use Swoole\Runtime;

// 启用协程支持
Runtime::enableCoroutine(true);

// 配置常量
const CONFIG = [
    'timeout'       => 10,        // 请求超时(秒)
    'log_file'      => __DIR__ . '/request.log',
    'base_url'      => 'https://www.baidu.com/2.php?id=',
    'concurrency'   => 100,       // 最大并发数
    'loop_delay'    => 1000000,   // 文件不存在时等待时间(微秒,1秒)
    'file_path'     => __DIR__ . '/url.txt'
];

/**
 * 处理订单号文件并发起请求
 */
function processOrderFile()
{
    $filePath = CONFIG['file_path'];

    // 检查文件是否存在
    if (!file_exists($filePath)) {
        echo "[" . date('Y-m-d H:i:s') . "] 文件不存在,等待1秒...\n";
        usleep(CONFIG['loop_delay']);
        return;
    }

    // 原子化读取并删除文件(避免并发写入冲突)
    $orders = [];
    $fp = fopen($filePath, 'r');
    if ($fp && flock($fp, LOCK_EX | LOCK_NB)) { // 非阻塞独占锁
        // 读取所有订单号
        while (($line = fgets($fp)) !== false) {
            $orderNo = trim($line);
            if (!empty($orderNo)) {
                $orders[] = $orderNo;
            }
        }
        flock($fp, LOCK_UN); // 释放锁
        fclose($fp);
        unlink($filePath);   // 读取完成后删除文件
    } else {
        echo "[" . date('Y-m-d H:i:s') . "] 文件被占用,跳过本轮处理\n";
        usleep(100000); // 短暂等待避免频繁尝试
        return;
    }

    if (empty($orders)) {
        echo "[" . date('Y-m-d H:i:s') . "] 文件为空,继续循环\n";
        return;
    }

    $orderCount = count($orders);
    echo "[" . date('Y-m-d H:i:s') . "] 读取到 {$orderCount} 个订单,开始处理\n";

    // 初始化并发控制和结果收集通道
    $semaphore = new Channel(CONFIG['concurrency']); // 并发信号量
    $resultChan = new Channel($orderCount);          // 结果收集通道
    $completed = 0;                                  // 完成计数

    // 发起所有请求(非阻塞)
    foreach ($orders as $orderNo) {
        // 信号量控制并发
        $semaphore->push(true);

        // 创建请求协程
        go(function () use ($orderNo, $semaphore, $resultChan, &$completed, $orderCount) {
            $result = [
                'id'    => $orderNo,
                'body'  => '',
                'error' => ''
            ];

            try {
                // 构建请求URL
                $url = CONFIG['base_url'] . urlencode($orderNo);
                $parsed = parse_url($url);

                if (empty($parsed['host'])) {
                    throw new Exception("无效的URL: {$url}");
                }

                // 发起HTTP请求
                $client = new Client($parsed['host'], 443, true);
                $client->set([
                    'timeout'         => CONFIG['timeout'] * 1000,
                    'connect_timeout' => 3 * 1000
                ]);

                $path = $parsed['path'] ?? '/';
                if (!empty($parsed['query'])) {
                    $path .= '?' . $parsed['query'];
                }

                $success = $client->get($path);
                $result['body'] = $client->body ?? '';

                if (!$success) {
                    $result['error'] = "请求失败 [{$client->errCode}]: {$client->errMsg}";
                }

                $client->close();
            } catch (Exception $e) {
                $result['error'] = "异常: {$e->getMessage()}";
            } finally {
                // 推送结果并释放信号量
                $resultChan->push($result);
                $semaphore->pop();

                // 所有请求完成后关闭结果通道
                if (++$completed >= $orderCount) {
                    $resultChan->close();
                }
            }
        });
    }

    // 异步处理结果日志(不阻塞主循环)
    go(function () use ($resultChan) {
        while (($res = $resultChan->pop()) !== false) {
            $log = "[" . date('Y-m-d H:i:s') . "]\n";
            $log .= "请求ID: {$res['id']}\n";
            $log .= empty($res['error']) ? "状态: 成功\n" : "状态: 失败 - {$res['error']}\n";
            $log .= "响应内容: {$res['body']}\n";
            $log .= "----------------------------------------\n\n";

            // 追加写入日志(使用FILE_APPEND确保原子性)
            file_put_contents(CONFIG['log_file'], $log, FILE_APPEND | LOCK_EX);
        }
        echo "[" . date('Y-m-d H:i:s') . "] 本轮结果处理完成\n";
    });

    // 无需等待结果,直接返回继续循环
    echo "[" . date('Y-m-d H:i:s') . "] 所有请求已提交,继续下一轮检查\n";
}

// 主协程:启动死循环
go(function () {
    echo "[" . date('Y-m-d H:i:s') . "] 程序启动,开始监控文件...\n";
    while (true) {
        processOrderFile();
    }
});

// 启动事件循环(CLI环境下Swoole需要)
Swoole\Event::wait();