跳至主要内容
處理 Asgard SSE 串流事件,實現即時打字機效果的完整範例

串流回應處理

Asgard API 使用 Server-Sent Events(SSE)串流回傳 AI 生成的文字。本範例展示如何解析各種 SSE 事件類型,並在前端實現即時打字機效果。

SSE 事件類型

事件類型說明建議處理方式
asgard.run.init請求初始化顯示載入狀態
asgard.message.delta訊息片段(打字機)累積並即時顯示文字
asgard.message.complete完整訊息更新最終訊息內容
asgard.run.done請求完成隱藏載入狀態

JavaScript EventSource 範例

EventSource 是瀏覽器原生支援 SSE 的 API,適合簡單的串流場景。

// 注意:EventSource 僅支援 GET 請求,Asgard API 使用 POST,
// 因此需要透過後端代理或改用 fetch + ReadableStream(見下方範例)。

// 若您的後端已包裝為 GET SSE 端點:
const source = new EventSource('/api/chat?channelId=channel-001&text=你好');

let fullText = '';

source.onmessage = (event) => {
try {
const data = JSON.parse(event.data);

switch (data.eventType) {
case 'asgard.run.init':
console.log('對話開始,runId:', data.fact.runInit?.runId);
document.getElementById('status').textContent = '思考中...';
break;

case 'asgard.message.delta':
fullText += data.fact.messageDelta.message.text;
document.getElementById('response').textContent = fullText;
break;

case 'asgard.message.complete':
fullText = data.fact.messageComplete.message.text;
document.getElementById('response').textContent = fullText;
break;

case 'asgard.run.done':
document.getElementById('status').textContent = '完成';
source.close();
break;
}
} catch (e) {
console.error('解析事件失敗', e);
}
};

source.onerror = (err) => {
console.error('SSE 連線錯誤', err);
source.close();
};

JavaScript fetch + ReadableStream 範例

使用 fetch 搭配 ReadableStream 可直接呼叫 Asgard POST API,是前端最常用的整合方式。

const BASE_URL = 'https://api.asgard-ai.com';
const NAMESPACE = 'your-namespace';
const BOT_PROVIDER = 'your-bot-provider';
const API_KEY = process.env.ASGARD_API_KEY;

async function streamMessage(channelId, text, onDelta, onComplete) {
const url = `${BASE_URL}/generic/ns/${NAMESPACE}/bot-provider/${BOT_PROVIDER}/message/sse`;

const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-API-KEY': API_KEY,
},
body: JSON.stringify({
customChannelId: channelId,
text,
action: 'NONE',
}),
});

if (!response.ok) {
throw new Error(`HTTP error: ${response.status}`);
}

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let fullText = '';

while (true) {
const { done, value } = await reader.read();
if (done) break;

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');

// 保留最後一行(可能尚未完整)
buffer = lines.pop() || '';

for (const line of lines) {
if (!line.startsWith('data:')) continue;

const jsonStr = line.slice(5).trim();
if (!jsonStr) continue;

try {
const event = JSON.parse(jsonStr);

switch (event.eventType) {
case 'asgard.run.init':
console.log('串流開始');
break;

case 'asgard.message.delta': {
const delta = event.fact.messageDelta.message.text;
fullText += delta;
// 呼叫回呼函式更新 UI(打字機效果)
onDelta?.(delta, fullText);
break;
}

case 'asgard.message.complete':
// 使用 complete 事件的完整文字確保一致性
fullText = event.fact.messageComplete.message.text;
break;

case 'asgard.run.done':
onComplete?.(fullText);
return fullText;
}
} catch (e) {
// 忽略非 JSON 行
}
}
}

return fullText;
}

// 使用範例(搭配 DOM 操作實現打字機效果)
const outputEl = document.getElementById('output');

streamMessage(
'channel-001',
'請介紹一下 Asgard 平台的主要功能',
(delta, accumulated) => {
// 每收到一個 delta 片段就更新畫面
outputEl.textContent = accumulated;
},
(finalText) => {
console.log('串流結束,完整回應:', finalText);
}
);

Python sseclient 範例

import requests
import json
import os

# 安裝依賴:pip install requests sseclient-py
import sseclient

BASE_URL = "https://api.asgard-ai.com"
NAMESPACE = "your-namespace"
BOT_PROVIDER = "your-bot-provider"
API_KEY = os.environ.get("ASGARD_API_KEY")


def stream_message(channel_id: str, text: str):
url = f"{BASE_URL}/generic/ns/{NAMESPACE}/bot-provider/{BOT_PROVIDER}/message/sse"

headers = {
"Content-Type": "application/json",
"X-API-KEY": API_KEY,
"Accept": "text/event-stream",
}

payload = {
"customChannelId": channel_id,
"text": text,
"action": "NONE",
}

full_text = ""

with requests.post(url, headers=headers, json=payload, stream=True) as response:
response.raise_for_status()
client = sseclient.SSEClient(response)

for event in client.events():
if not event.data:
continue

try:
data = json.loads(event.data)
event_type = data.get("eventType")

if event_type == "asgard.run.init":
print("串流開始...")

elif event_type == "asgard.message.delta":
delta = data["fact"]["messageDelta"]["message"]["text"]
full_text += delta
# 即時輸出(打字機效果)
print(delta, end="", flush=True)

elif event_type == "asgard.message.complete":
full_text = data["fact"]["messageComplete"]["message"]["text"]

elif event_type == "asgard.run.done":
print("\n串流結束")
break

except json.JSONDecodeError:
pass

return full_text


if __name__ == "__main__":
result = stream_message("channel-001", "請介紹一下 Asgard 平台的主要功能")
print(f"\n完整回應:\n{result}")

React Hook 串流模式

以下是一個封裝串流邏輯的自訂 React Hook,可直接在 React 元件中使用。

import { useState, useCallback, useRef } from 'react';

const BASE_URL = 'https://api.asgard-ai.com';
const NAMESPACE = 'your-namespace';
const BOT_PROVIDER = 'your-bot-provider';

/**
* 自訂 Hook:處理 Asgard SSE 串流回應
*/
function useAsgardStream(apiKey) {
const [response, setResponse] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const [error, setError] = useState(null);
const abortControllerRef = useRef(null);

const sendMessage = useCallback(
async (channelId, text) => {
// 取消上一個進行中的請求
abortControllerRef.current?.abort();
abortControllerRef.current = new AbortController();

setIsStreaming(true);
setResponse('');
setError(null);

const url = `${BASE_URL}/generic/ns/${NAMESPACE}/bot-provider/${BOT_PROVIDER}/message/sse`;

try {
const res = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-API-KEY': apiKey,
},
body: JSON.stringify({
customChannelId: channelId,
text,
action: 'NONE',
}),
signal: abortControllerRef.current.signal,
});

if (!res.ok) throw new Error(`HTTP error: ${res.status}`);

const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = '';

while (true) {
const { done, value } = await reader.read();
if (done) break;

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';

for (const line of lines) {
if (!line.startsWith('data:')) continue;
const jsonStr = line.slice(5).trim();
if (!jsonStr) continue;

try {
const event = JSON.parse(jsonStr);

if (event.eventType === 'asgard.message.delta') {
const delta = event.fact.messageDelta.message.text;
setResponse((prev) => prev + delta);
}

if (event.eventType === 'asgard.run.done') {
setIsStreaming(false);
return;
}
} catch (_) {}
}
}
} catch (err) {
if (err.name !== 'AbortError') {
setError(err.message);
}
} finally {
setIsStreaming(false);
}
},
[apiKey]
);

const stop = useCallback(() => {
abortControllerRef.current?.abort();
setIsStreaming(false);
}, []);

return { response, isStreaming, error, sendMessage, stop };
}

// 使用範例元件
function ChatWidget({ apiKey }) {
const [input, setInput] = useState('');
const { response, isStreaming, error, sendMessage, stop } = useAsgardStream(apiKey);

const handleSubmit = (e) => {
e.preventDefault();
if (!input.trim()) return;
sendMessage('channel-001', input);
setInput('');
};

return (
<div>
<div className="response-box">
{response || (isStreaming ? '思考中...' : '等待輸入')}
{isStreaming && <span className="cursor">|</span>}
</div>

{error && <div className="error">錯誤:{error}</div>}

<form onSubmit={handleSubmit}>
<input
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="輸入訊息..."
disabled={isStreaming}
/>
<button type="submit" disabled={isStreaming || !input.trim()}>
發送
</button>
{isStreaming && (
<button type="button" onClick={stop}>
停止
</button>
)}
</form>
</div>
);
}

下一步