标签

Honeymoon - Thomas Ng

归档

近期文章

php流式请求DeepSeek接口,curl流式请求

调用

/**
     * 流式请求 DeepSeek API get请求
     * /v1/ai/stream
     */
    public function stream()
    {
        $params = request()->input();
        $validator = $this->validator::make($params, [
            'messages' => 'required',
        ], [
            'messages.required' => '消息的内容不能为空',
        ]);
        if ($validator->fails()) {
            return $this->error($validator->messages()->first());
        }

        if (empty($params['ai_model'])) {
            $params['ai_model'] = 'deepseek-chat';
        }
        if (!in_array($params['ai_model'], ['deepseek-chat', 'deepseek-reasoner'])) return $this->error('ai模型错误');

        $model = $params['ai_model'];  // 可选参数,默认值 'deepseek-chat',deepseek-reasoner

        //历史记录
        $model_type = ($params['ai_model'] == 'deepseek-chat') ? 1 : 2;
        $login_user_id = (int)Auth::id();
        if (!$login_user_id) return $this->error('请登录后操作!', 401);

        $messages_list = AiChat::select(['id', 'message', 'content'])
            ->where('user_id', $login_user_id)
            ->where('type', 1)
            ->where('model', $model_type)
            ->where('chat_id', $params['chat_id'])
            ->get();
        $messages = [];
        foreach ($messages_list as $value) {
            $messages[] = ['role' => 'user', 'content' => $value['message']];
            $messages[] = ['role' => 'assistant', 'content' => $value['content']];
        }
        // 获取前端传来的消息
        $messages[] = ['role' => 'user', 'content' => $params['messages']];

        // 创建流式响应
        $buffer = ''; // 提前定义
        $response = new StreamedResponse(function () use (&$buffer, $messages, $model, $params) {
            DeepSeekService::getInstance()->stream(
                $messages,
                function ($chunk) use (&$buffer) {
                    $buffer .= $chunk;
                    echo $chunk;
                    ob_flush();
                    flush();
                },
                $model,
                function () use (&$buffer, &$params) {
                    $insert_data = [
                        'chat_id' => $params['chat_id'],
                        'message' => $params['messages'],
                        'user_id' => (int)(Auth::id()),
                        'type' => 1,
                        'model' => ($params['ai_model'] == 'deepseek-chat') ? 1 : 2,
                        'content' => $buffer,
                        'result' => 'stream',
                    ];
                    AiChat::query()->insert($insert_data);
                }
            );
        });

        // 设置响应头
        $response->headers->set('Content-Type', 'text/event-stream');
        $response->headers->set('Cache-Control', 'no-cache');
        $response->headers->set('X-Accel-Buffering', 'no'); // 防止 NGINX 缓冲

        return $response;
    }

    /**
     * 流式请求 DeepSeek API post请求
     * /v1/ai/stream
     */
    public function streamV2()
    {
        $params_all = request()->input();
        $params = '';
        foreach ($params_all as $key => $value) {
            $params = json_decode($key, true);
        }
        if (!$params) return $this->error('参数错误');
        $validator = $this->validator::make($params, [
            'messages' => 'required',
        ], [
            'messages.required' => '消息的内容不能为空',
        ]);
        if ($validator->fails()) {
            return $this->error($validator->messages()->first());
        }
        // 获取前端传来的消息
        $messages = [
            ['role' => 'user', 'content' => $params['messages']]
        ];
        if (empty($params['ai_model'])) {
            $params['ai_model'] = 'deepseek-chat';
        }
        if (!in_array($params['ai_model'], ['deepseek-chat', 'deepseek-reasoner'])) return $this->error('ai模型错误');

        $model = $params['ai_model'];  // 可选参数,默认值 'deepseek-chat',deepseek-reasoner

        // 创建流式响应
        $response = new StreamedResponse(function () use ($messages, $model) {
            DeepSeekService::getInstance()->streamV2($messages, function ($chunk) {
                echo $chunk;
                flush();  // 刷新系统输出缓冲区
                $chunk = trim($chunk);
                if (($chunk == ': keep-alive') || ($chunk == 'data: [DONE]')) $text = '';
                $chunk_data = explode("\n\n", $chunk);
                foreach ($chunk_data as $key => $line) {
                    $text = '';
                    $line = trim($line);
                    if (str_starts_with($line, 'data: ')) {
                        $json = substr($line, 6);
                        if ($json === '[DONE]') break;
                        $payload = json_decode($json, true);
                        $text = '';
                        if (!empty($payload['choices'][0]['delta']['content'])) {
                            $text = $payload['choices'][0]['delta']['content'];
                        }
                        if (!empty($payload['choices'][0]['delta']['reasoning_content'])) {
                            $text = $payload['choices'][0]['delta']['reasoning_content'];
                        }
                    }
                    if (!$text) continue;
                    echo $text;
                    ob_flush();  // 刷新 PHP 输出缓冲区
                    flush();  // 刷新系统输出缓冲区
                }
            }, $model);
        });

        // 设置响应头
        $response->headers->set('Content-Type', 'text/event-stream');
        $response->headers->set('Cache-Control', 'no-cache');
        $response->headers->set('X-Accel-Buffering', 'no'); // 防止 NGINX 缓冲

        return $response;
    }

DeepSeekService.php

<?php

namespace App\Services\Ai;

use App\Services\BaseService;

class DeepSeekService extends BaseService
{
    const TOKEN = 'sk-421301a27465';

    private static $instance;

    public static function getInstance()
    {
        if (!self::$instance instanceof self) {
            self::$instance = new self();
        }
        return self::$instance;
    }

    /**
     * @notes: 请求 DeepSeek API 获取 completions
     * @param string $ai_content 用户输入内容
     * @param string $model 使用的模型,默认为 'deepseek-reasoner'
     * @return array 返回处理结果,包含 code, msg, data
     * @author: OwenZhang
     * @time: 2025/4/1 10:41
     */
    public function completions($ai_content, $model = 'deepseek-chat')
    {
        $url = 'https://api.deepseek.com/chat/completions';
        $authorizationToken = self::TOKEN;

        // 构建请求数据
        $data = [
            'messages' => $ai_content,
            'model' => $model
        ];

        // 初始化 cURL
        $curl = curl_init();

        curl_setopt_array($curl, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_ENCODING => '',
            CURLOPT_MAXREDIRS => 10,
            CURLOPT_TIMEOUT => 0,
            CURLOPT_FOLLOWLOCATION => true,
            CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
            CURLOPT_CUSTOMREQUEST => 'POST',
            CURLOPT_POSTFIELDS => json_encode($data), // 使用 json_encode 编码请求体
            CURLOPT_HTTPHEADER => [
                'Content-Type: application/json',
                'Accept: application/json',
                'Authorization: Bearer ' . $authorizationToken
            ],
        ]);

        $response = curl_exec($curl);
        if ($response === false) {
            $error = curl_error($curl);
            $errno = curl_errno($curl);
            curl_close($curl);
            return ['code' => 400, 'msg' => "cURL Error ($errno): $error", 'data' => []];
        }

        curl_close($curl);

        return ['code' => 200, 'msg' => '', 'data' => $response];
    }

    /**
     * 流式发送 AI 请求
     * @param array $messages 消息数组
     * @param callable|null $onData 每段数据到达时回调 function($chunk) {}
     * @param string $model 使用的模型名称
     * @param callable|null $onDone 所有数据发送完成后的回调 function() {}
     */
    public function stream(array $messages, ?callable $onData = null, string $model = 'deepseek-chat', ?callable $onDone = null): void
    {
        if (empty($messages)) {
            echo "消息不能为空。\n";
            return;
        }

        $url = 'https://api.deepseek.com/chat/completions';

        $postData = [
            'model' => $model,
            'messages' => $messages,
            'stream' => true
        ];

        $headers = [
            'Content-Type: application/json',
            'Accept: text/event-stream',
            'Authorization: Bearer ' . self::TOKEN
        ];

        $calledDone = false; // 防止多次调用 onDone

        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $url,
            CURLOPT_CONNECTTIMEOUT => 10,
            CURLOPT_TCP_KEEPALIVE => 1,
            CURLOPT_POST => true,
            CURLOPT_HTTPHEADER => $headers,
            CURLOPT_POSTFIELDS => json_encode($postData),
            CURLOPT_TIMEOUT => 0,
            CURLOPT_WRITEFUNCTION => function ($ch, $data) use ($onData, $onDone, &$calledDone) {
                foreach (explode("\n\n", $data) as $line) {
                    $line = trim($line);
                    if (str_starts_with($line, 'data: ')) {
                        $json = substr($line, 6);
                        if ($json === '[DONE]') {
                            if ($onDone && !$calledDone) {
                                $calledDone = true;
                                call_user_func($onDone);
                            }
                            return strlen($data);
                        }

                        $payload = json_decode($json, true);
                        $text = '';
                        if (!empty($payload['choices'][0]['delta'])) {
                            $delta = $payload['choices'][0]['delta'];
                            if (empty($delta['content']) && empty($delta['reasoning_content'])) {
                                continue;
                            }
                            $text = $delta['content'] ?? $delta['reasoning_content'] ?? '';
                        }

                        if ($onData) {
                            $onData($text);
                        } else {
                            echo $text;
                            flush();
                        }
                    }
                }

                return strlen($data);
            },
        ]);

        curl_exec($ch);

        if (curl_errno($ch)) {
            echo "[错误] " . curl_error($ch) . "\n";
        }

        curl_close($ch);

        // 再次兜底,防止 curl_exec 提前返回没触发 onDone
        if ($onDone && !$calledDone) {
            $calledDone = true;
            call_user_func($onDone);
        }
    }

    public function streamV2(array $messages, ?callable $onData = null, string $model = 'deepseek-chat'): void
    {
        $payload = json_encode([
            'model' => $model,
            'messages' => $messages,
            'stream' => true,
        ]);

        $headers = [
            'Content-Type: application/json',
            'Authorization: Bearer ' . self::TOKEN,
        ];

        $url = 'https://api.deepseek.com/chat/completions';

        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
        curl_setopt($ch, CURLOPT_POST, true);
        curl_setopt($ch, CURLOPT_POSTFIELDS, $payload);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, false);
        curl_setopt($ch, CURLOPT_WRITEFUNCTION, function ($ch, $data) use ($onData) {
            if ($onData) {
                call_user_func($onData, $data);
            }
            return strlen($data);
        });
        curl_setopt($ch, CURLOPT_TIMEOUT, 300);
        curl_setopt($ch, CURLOPT_FOLLOWLOCATION, true);
        curl_setopt($ch, CURLOPT_HEADER, false);

        // VERY IMPORTANT: enable streaming
        curl_setopt($ch, CURLOPT_NOPROGRESS, false);
        curl_setopt($ch, CURLOPT_BUFFERSIZE, 128);
        curl_setopt($ch, CURLOPT_TCP_NODELAY, true);

        curl_exec($ch);

        if (curl_errno($ch)) {
            echo 'Curl error: ' . curl_error($ch);
        }

        curl_close($ch);
    }

}
Tags: