SSE 异步编程 响应时间过长的问题

 2024-05-23    0 条评论    545 浏览 SSE

1.前端访问后端的AI消息接口,后端记录并且异步流式返回,发现时间很久

	[DisplayName("添加消息")]
    [HttpPost("AddMessage")]
    public async Task AddMessage([FromBody] AIMessage message, bool isFirstMessage)
    {
        if (message.ConversationId > 0)
            isFirstMessage = false;
        message.UserId = _userManager.UserId;
        long ConversationId = await _aimessageService.AddMessageAsync(message, isFirstMessage);
        LiandanluExclusiveModelRequest args = ConvertToExclusiveModelRequest(message);

        var response = _httpContextAccessor.HttpContext.Response;
        response.ContentType = "text/event-stream";
        response.Headers.Add("Cache-Control", "no-cache");
        response.Headers.Add("Connection", "keep-alive");

        var aiResponseContent = new StringBuilder();

        try
        {
            using (var writer = new StreamWriter(response.Body))
            {
                await BaiLianTurboGenerate(message.MessageText, writer, aiResponseContent, ConversationId);
            }
        }
        catch (Exception ex)
        {
            // 记录错误信息
            //_logger.LogError(ex, "An error occurred while streaming the response.");
        }

        if (aiResponseContent.Length > 0)
        {
            var aiMessage = new AIMessage
            {
                ConversationId = message.ConversationId,
                UserId = null,
                SenderType = "AI",
                MessageText = aiResponseContent.ToString()
            };
            await _aimessageService.AddMessageAsync(aiMessage, false);
        }
    }
	
	[NonAction]
    [HttpPost("blmemoLong")]
    public async Task BaiLianLongGenerate(string Prompt, StreamWriter writer, StringBuilder aiResponseContent, long ConversationId)
    {
        var httpClient = _httpClientFactory.CreateClient();
        var requestUrl = "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions";

        var requestBody = new
        {
            model = _aitokenOptions.BaiLianModel,
            stream = true,
            messages = new[]
            {
                new { role = "system", content = _aitokenOptions.BaiLianPrompt },
                new { role = "user", content = Prompt }
            }
        };

        var requestContent = new StringContent(JsonConvert.SerializeObject(requestBody), Encoding.UTF8, "application/json");
        httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", _aitokenOptions.BaiLianAPIKey);

        var response = await httpClient.PostAsync(requestUrl, requestContent);
        response.EnsureSuccessStatusCode();

        var stream = await response.Content.ReadAsStreamAsync();

        using (var reader = new StreamReader(stream, Encoding.UTF8, true, 8192))
        {
            string line;
            while ((line = await reader.ReadLineAsync()) != null)
            {
                if (line.StartsWith("data:"))
                {
                    line = line.Substring(5);
                }
                var lineData = JsonConvert.DeserializeObject<BaiLianLongResponse>(line);
                if (lineData != null && lineData.Choices != null && lineData.Choices.Length > 0)
                {
                    var choice = lineData.Choices[0];
                    if (choice.Delta != null && choice.Delta.Content != null)
                    {
                        var data = choice.Delta.Content;
                        var responseData = new
                        {
                            ConversationId = ConversationId,
                            data = data
                        };
                        await writer.WriteAsync($"data: {JsonConvert.SerializeObject(responseData)}\n\n");
                        await writer.FlushAsync();

                        Thread.Sleep(100);

                        aiResponseContent.Append(data);
                    }
                    if (choice.FinishReason == "stop")
                    {
                        break;
                    }
                }
            }
        }
    }

如上,本意是阿里云百炼的接口返回一条,我的接口响应一条,可是在实际的返回中发现,虽然也是流式返回,但是返回的时间很长,好像是等待所有的流式返回都返回之后,在进行的流式返回~~

2.打上断点发现

var response = await httpClient.PostAsync(requestUrl, requestContent); 这一步耗时很久

然后在查阅资料后发现,在httpClient处理SSE的时候,需要设置

	HttpCompletionOption.ResponseHeadersRead 
	表示请求在读取完响应的所有内容后才会返回;

chatGPT给到的答复是

HttpCompletionOption.ResponseHeadersRead HttpCompletionOption 是一个枚举类型,用于配置 HttpClient 在完成 HTTP 操作时的行为。该枚举有两个值:

HttpCompletionOption.ResponseContentRead(默认): 当调用 HttpClient.SendAsync 时,请求在读取完响应的所有内容后才会返回。所以异步请求将等待整个响应数据的读取完成。

HttpCompletionOption.ResponseHeadersRead: 当调用 HttpClient.SendAsync 时,请求在接收到响应的头部后立即返回,而不是等待整个响应内容的读取。这意味着在接收到响应头后,可以立即开始处理响应流,在某些流式数据的场景(如 SSE)中,这个选项非常有用。

var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);

使用 HttpCompletionOption.ResponseHeadersRead 的代码将更早开始处理响应,尤其是在响应主体内容非常大的时候,这能大大减少等待时间并提高效率。

还一个就是,在异步编程的时候,需要捕获同步的上下文,默认为true

	.ConfigureAwait(false)

所以,在设置HttpCompletionOption.ResponseHeadersRead为false、和.ConfigureAwait(false)之后,响应速度和直接请求接口一样了

3.以下是修改后的代码

[NonAction]
    [HttpPost("blmemoturbo")]
    public async Task BaiLianTurboGenerate(string Prompt, StreamWriter writer, StringBuilder aiResponseContent, long ConversationId)
    {
        var httpClient = _httpClientFactory.CreateClient();
        var requestUrl = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation";

        var requestBody = new
        {
            model = "qwen-turbo",
            input = new
            {
                messages = new[]
                {
                    new { role = "system", content = _aitokenOptions.BaiLianPrompt },
                    new { role = "user", content = Prompt }
                }
            },
            parameters = new
            {
                result_format = "message",
                incremental_output = true
            }
        };

        var requestContent = new StringContent(JsonConvert.SerializeObject(requestBody), Encoding.UTF8, "application/json");
        httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", _aitokenOptions.BaiLianAPIKey);
        httpClient.DefaultRequestHeaders.Add("X-DashScope-SSE", "enable");

        var response = await httpClient.SendAsync(new HttpRequestMessage(HttpMethod.Post, requestUrl) { Content = requestContent }, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
        response.EnsureSuccessStatusCode();

        var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);

        using (var reader = new StreamReader(stream, Encoding.UTF8, true, 8192))
        {
            string line;
            while ((line = await reader.ReadLineAsync().ConfigureAwait(false)) != null)
            {
                if (line.StartsWith("data:"))
                {
                    line = line.Substring(5).Trim();
                    if (!string.IsNullOrEmpty(line))
                    {
                        try
                        {
                            var lineData = JsonConvert.DeserializeObject<BaiLianTurboResponse>(line);
                            if (lineData?.Output?.Choices != null && lineData.Output.Choices.Length > 0)
                            {
                                var choice = lineData.Output.Choices[0];

                                if (choice.Message?.Content != null)
                                {
                                    var data = choice.Message.Content;

                                    var responseData = new
                                    {
                                        ConversationId = ConversationId,
                                        data = data
                                    };

                                    await writer.WriteAsync($"data: {JsonConvert.SerializeObject(responseData)}\n\n");
                                    await writer.FlushAsync();

                                    aiResponseContent.Append(data);
                                }

                                if (choice.FinishReason == "stop")
                                {
                                    break;
                                }
                            }
                        }
                        catch (JsonException ex)
                        {
                            // Add appropriate logging or error handling here
                            Console.WriteLine($"JSON parse exception: {ex}");
                        }
                    }
                }
            }
        }
    }

4.至此响应慢的速度已经解决