was wondering if someone would be able to give me a hand at trying to figure out an issue I'm having with an SSE stream.
Basically, I'm trying to fully integrate streaming and pubsub into a AI chat web app that I'm building with Next 15. The flow that I have set up successfully publishes my message to a request topic via my backend (built using FastAPI), then streams the response JSON objects (from a separate server) to my response topic for my FastAPI backend to pick up on with a subscription, streaming server-side events to my front end. I am receiving these events using the following function in my Next app (distilled down for an mre):
export async function* streamResponse(responseUrl: string): AsyncGenerator<{
content: string;
type: string;
metadata: Record<string, any>;
}> {
try {
const response = await fetch(responseUrl);
const reader = response.body?.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader!.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Process complete SSE events (split by double newlines)
const events = buffer.split("\n\n");
buffer = events.pop() || ""; // Save incomplete chunks
for (const event of events) {
const lines = event.split("\n");
let dataStr = "";
for (const line of lines) {
// Only consider lines starting with "data:"
if (line.startsWith("data:")) {
dataStr += line.slice("data:".length).trim();
}
}
if (!dataStr || dataStr === "[DONE]") continue;
try {
const parsed = JSON.parse(dataStr);
yield {
content: parsed.content,
type: parsed.type,
metadata: parsed.metadata,
};
} catch (error) {
console.error("Error parsing event:", error, "Raw data:", dataStr);
yield {
content: `Error: ${error}`,
type: "error",
metadata: {},
};
}
}
}
// Process any remaining buffer
if (buffer.trim()) {
try {
const parsed = JSON.parse(buffer);
yield {
content: parsed.content || "",
type: parsed.type || "content",
metadata: parsed.metadata || {},
};
} catch (error) {
console.error("Error parsing final chunk:", error);
}
}
} catch (error) {
console.error("Streaming error:", error);
yield {
content: `Stream error: ${error}`,
type: "error",
metadata: {},
};
}
}
This function is further called in the page where I actually want to stream the response values to the user like so:
"use client";
export default function Chat() {
const messagesEndRef = useRef<HTMLDivElement>(null);
// Local state to accumulate the streaming text
const [streamedText, setStreamedText] = useState("");
const [responseStreaming, setResponseStreaming] = useState(false);
const [responseLoading, setResponseLoading] = useState(false);
const [messages, setMessage] = useState([]);
// mock vars
const id = "1234";
let responseUrl = "https://api.example.com/response?correlation_id=5678";
// Stream the response from the backend
const getStreamedResponse = async () => {
let fullResponse = "";
try {
const stream = streamResponse(responseUrl!);
setResponseStreaming(true);
for await (const event of stream) {
if (event.type === "token") {
setStreamedText((prev) => prev + event.content);
fullResponse += event.content;
} else if (event.type === "end") {
break;
} else if (event.type === "error") {
console.error("Streaming error:", event.content);
setStreamedText("An error occurred while processing the response.");
break;
}
}
} catch (error) {
console.error("Streaming error:", error);
} finally {
console.log("Final Message (Local Var): ", fullResponse);
console.log("Final Message (State Var): ", streamedText);
setResponseLoading(false);
setResponseStreaming(false);
responseUrl = "";
setStreamedText("");
}
};
// Effect to load session and trigger streaming when a new message is sent.
useEffect(() => {
if (responseUrl !== "") {
getStreamedResponse();
}
}, [responseUrl]);
// Auto-scroll to the bottom when messages update
useEffect(() => {
if (messagesEndRef.current) {
messagesEndRef.current.scrollIntoView({ behavior: "smooth" });
}
}, [messages, responseStreaming, streamedText]);
// Cleanup effect: abort ongoing requests and reset context values on unmount
useEffect(() => {
const abortController = new AbortController();
return () => {
abortController.abort();
responseUrl = "";
setResponseLoading(false);
setResponseStreaming(false);
setStreamedText("");
};
}, []);
return (
<div className="flex flex-col min-w-0 h-dvh bg-background">
<div className="flex flex-col min-w-0 gap-6 flex-1 overflow-y-scroll pt-4">
{messages.length > 0 &&
messages.map(
(message: {
sender: string;
message: string;
created_at: string;
}) => (
<PreviewMessage
key={`${message.sender}-${message.created_at}`}
message={message}
/>
),
)}
{responseStreaming && streamedText != "" && (
<PreviewMessage
message={{
sender: "bot",
message: streamedText,
created_at: new Date().toISOString(),
}}
/>
)}
{responseStreaming && streamedText == "" && <ThinkingMessage />}
<div
ref={messagesEndRef}
className="shrink-0 min-w-[24px] min-h-[24px]"
/>
</div>
<div className="flex mx-auto px-4 bg-background pb-4 md:pb-6 gap-2 w-full md:max-w-3xl">
<Input />
</div>
</div>
);
}
The issue I'm having is that the fullResponse variable in the getStreamedResponse() function is sometimes empty, despite the stream correctly gathering and rendering the collective streamed messages from the responseUrl. By this, I mean that every time I run this function and stream the response using the function, the state string variable streamedText correctly gathers and displays the message to the front end. When I try to console.log() the full collected response in either the state string variable or the local variable, I sometimes see the full response, and other times see an empty string. I can't figure out for the life of me why this is happening as the text is being collected and streamed correctly to the front end in both cases, but I can't seem to gather these response chunks at random points (for the same or different messages streamed back from the server). Any insight or help in this would REALLY be appreciated, thanks in advance!
EventSource?