WebSocket API Gateway로 streaming 응답 받기 (feat. bedrock)
- 목차
배경
LLM을 이용한 서비스, 예를 들면 지식 기반의 답변을 하는 챗봇을 개발하고자 할 때 API가 필요할 수 있다. 문제는 자주 사용하는 REST API는 streaming 응답을 지원하지 않기 때문에 다른 방법으로 개발할 필요가 있다. 다음과 같은 방법들이 있다.
1. 서버를 통한 API 생성
EC2와 같은 클라우드든 온프레미스 환경이든 해당 서버에서 flask나 fastapi 같은 걸로 API를 구축하여 배포하는 것이다.
다만 사용자가 적을 경우 비용이 많이 든다는 단점과 어차피 서버를 띄운다면 API를 굳이 만들어야 하나? 하는 의문이 생기게 된다.
2. Lambda의 함수 URL을 이용한 streaming
Lambda의 기능 중 API Gateway 연결 없이 API처럼 사용할 수 있게 하는 함수 URL이 있다. 호출 모드를 RESPONSE_STREAM으로 설정하면 stream으로 답변을 받을 수 있는 구조다.
https://docs.aws.amazon.com/ko_kr/lambda/latest/dg/configuration-response-streaming.html
다만, 2025년 3월 현재 Node.js 지원되므로 python 개발자에게는 익숙하지 않을 수 있다.
3. API Gateway의 WebSocket으로 API 생성
https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-websocket-api.html
WebSocket API는 클라이언트와 서버 간 실시간 양방향 통신을 가능하게 하는 프로토콜이다. 일반적인 HTTP 요청과 달리 연결을 끊지 않는 한 지속적으로 데이터를 주고 받을 수 있다. 이런 점을 이용하여 streaming 응답을 받을 수 있게 된다.
구현
1. Lambda 함수 생성
import json
import boto3
bedrock_client = boto3.client('bedrock-runtime', region_name='us-east-1')
def lambda_handler(event, context):
connection_id = event['requestContext']['connectionId']
route_key = event['requestContext']['routeKey']
if route_key == '$connect':
return {'statusCode': 200, 'body': 'Connected'}
elif route_key == '$disconnect':
return {'statusCode': 200, 'body': 'Disconnected'}
elif route_key == 'sendmessage':
# 메시지 추출
if isinstance(event['body'], str):
body = json.loads(event['body'])
else:
body = event['body']
message = body.get('message', '')
inf_params = {"max_new_tokens": 512, "temperature": 0.9}
request_body = {
"schemaVersion": "messages-v1",
"messages": [{"role": "user", "content": [{"text": message}]}],
"inferenceConfig": inf_params,
}
# WebSocket 응답 전송 API 클라이언트 생성
api_client = boto3.client(
'apigatewaymanagementapi',
endpoint_url=f"https://{event['requestContext']['domainName']}/{event['requestContext']['stage']}"
)
try:
# Bedrock API 스트리밍 호출
response = bedrock_client.invoke_model_with_response_stream(
modelId='amazon.nova-pro-v1:0',
body=json.dumps(request_body)
)
stream = response.get("body")
if stream:
for event in stream:
chunk = event.get("chunk")
if chunk:
chunk_json = json.loads(chunk.get("bytes").decode())
content_block_delta = chunk_json.get("contentBlockDelta")
if content_block_delta:
delta_text = content_block_delta.get("delta", {}).get("text", "")
if delta_text:
# WebSocket으로 delta text 전송
api_client.post_to_connection(
ConnectionId=connection_id,
Data=json.dumps({"response": delta_text})
)
except Exception as e:
print("Error:", str(e))
return {'statusCode': 500, 'body': 'Error processing response'}
return {'statusCode': 200, 'body': 'Message sent'}
else:
return {'statusCode': 400, 'body': 'Unhandled route'}
(1) 전체적인 흐름
- 클라이언트가 WebSocket에 연결하면 `$connect` 핸들링
- 클라이언트가 메시지를 보내면 sendmessage 핸들링: Bedrock API를 호출하여 AI 응답을 스트리밍 방식으로 받고 WebSocket을 통해 클라이언트로 전송
- 연결이 끊어지면 `$disconnect` 핸들링
(2) 연결된 상태에서 streaming으로 응답 보내기
# WebSocket 응답 전송 API 클라이언트 생성
api_client = boto3.client(
'apigatewaymanagementapi',
endpoint_url=f"https://{event['requestContext']['domainName']}/{event['requestContext']['stage']}"
)
...
api_client.post_to_connection(
ConnectionId=connection_id,
Data=json.dumps({"response": delta_text})
)
핵심은 위 부분이라고 생각한다.
2. API Gateway 생성
(1) WebSocket API 구축을 선택한다.
(2) API 이름과 라우팅을 입력한다.
원하는 API 이름을 지정하며, 라우팅은 예시와 동일하게 `request.body.action`을 입력한다.
(3) 경로 추가
`$connect`, `$disconnect`를 추가하고 메시지를 전달할 API를 위해 사용자 지정 경로도 추가해준다.
(4) Lambda 연결
앞서 생성한 Lambda를 모든 경로와 연결해준다.
(5) 구축 완료
스테이지 이름은 `production`, `prod` 등 본인 입맛에 따라 설정한 뒤 WebSocket API를 생성 완료하자.
3. WebSocket 테스트 in Python
구축한 API의 스테이지에 들어가 `wss`로 시작하는 WebSocket URL을 복사하여 아래 코드에 심어주자.
import websocket
import json
import threading
# 일정 시간 동안 응답이 없으면 연결을 종료하는 타이머
disconnect_timer = None
DISCONNECT_TIMEOUT = 5 # 5초 동안 추가 응답이 없으면 연결 종료
def reset_disconnect_timer(ws):
"""타이머를 리셋하여 일정 시간 후 자동으로 연결 종료"""
global disconnect_timer
if disconnect_timer:
disconnect_timer.cancel() # 기존 타이머 취소
disconnect_timer = threading.Timer(DISCONNECT_TIMEOUT, lambda: ws.close())
disconnect_timer.start()
def on_message(ws, message):
"""서버로부터 메시지를 수신했을 때 실행되는 콜백 함수"""
try:
data = json.loads(message)
response_text = data.get("response", "")
print(response_text, end='')
# 응답이 있으면 타이머 리셋
reset_disconnect_timer(ws)
except json.JSONDecodeError:
print("Invalid JSON received:", message)
def on_open(ws):
"""WebSocket 연결이 성공했을 때 실행되는 콜백 함수"""
print("Connected to WebSocket API Gateway")
payload = json.dumps({
"action": "sendmessage",
"message": "Your Message"
})
ws.send(payload)
# 연결이 열리면 응답이 없을 경우 자동 종료하는 타이머 시작
reset_disconnect_timer(ws)
def on_close(ws, close_status_code, close_msg):
"""WebSocket 연결이 종료되었을 때 실행되는 콜백 함수"""
print("\nDisconnected")
def on_error(ws, error):
"""WebSocket 에러 발생 시 실행되는 콜백 함수"""
print("Error:", error)
# WebSocket 연결 URL
ws_url = "wss://your-api-id.execute-api.ap-northeast-2.amazonaws.com/production/"
# WebSocket 클라이언트 실행
ws = websocket.WebSocketApp(
ws_url,
on_open=on_open,
on_message=on_message,
on_close=on_close,
on_error=on_error
)
ws.run_forever()
이러면 streaming되어 응답이 오게 되고, 5초간 추가 응답이 없으면 연결을 종료하게 된다.
4. 간단한 Vue.js 만들기
`my-vue-app` > `src` > `components`에 `WebSocketComponent.vue`를 새로 생성하고 아래 코드를 복붙한다. (코드는 AI가 작성해주었다.)
▼ 펼치기
<template>
<div class="chat-container">
<h1>WebSocket AI 채팅</h1>
<div class="chat-box">
<div v-for="(msg, index) in chatHistory" :key="index" :class="msg.sender">
<div :class="msg.sender === 'user' ? 'user-message' : 'ai-message'">
<span v-html="computedMarkdown(msg.text)"></span>
</div>
</div>
</div>
<div class="input-container">
<input v-model="message" placeholder="메시지를 입력하세요" @keyup.enter="sendMessage" />
<button @click="sendMessage">전송</button>
</div>
</div>
</template>
<script>
import { marked } from "marked";
import hljs from "highlight.js";
import "highlight.js/styles/github.css";
marked.setOptions({
highlight: function (code, lang) {
const language = hljs.getLanguage(lang) ? lang : "plaintext";
return hljs.highlight(code, { language }).value;
},
breaks: true,
gfm: true,
});
export default {
data() {
return {
socket: null,
message: "",
chatHistory: [],
aiResponseBuffer: "",
isReceiving: false,
reconnectTimeout: null,
pingTimeout: null // 🔹 9분 후 ping 전송 타이머
};
},
computed: {
computedMarkdown() {
return (text) => {
const rawHTML = marked(text);
this.$nextTick(() => {
document.querySelectorAll("pre code").forEach((block) => {
if (!block.classList.contains("hljs")) {
hljs.highlightElement(block);
}
});
});
return rawHTML;
};
}
},
methods: {
connectWebSocket() {
const endpoint = "wss://your-api-id.execute-api.ap-northeast-2.amazonaws.com/prod";
this.socket = new WebSocket(endpoint);
this.socket.onopen = () => {
console.log(`✅ WebSocket 연결 성공- ${new Date().toISOString()}`);
clearTimeout(this.reconnectTimeout);
};
this.socket.onmessage = this.handleWebSocketMessage;
this.socket.onclose = this.handleSocketClose;
this.socket.onerror = (error) => {
console.error("⚠ WebSocket 오류 발생:", error);
};
},
handleSocketClose() {
console.log(`❌ WebSocket 연결 종료됨 - ${new Date().toISOString()}`);
this.isReceiving = false;
clearTimeout(this.pingTimeout); // 🔹 ping 타이머 정리
// 🔄 자동 재연결 (3초 후)
this.reconnectTimeout = setTimeout(() => {
console.log(`🔄 WebSocket 재연결 시도...- ${new Date().toISOString()}`);
this.connectWebSocket();
}, 3000);
},
handleWebSocketMessage(event) {
try {
const data = JSON.parse(event.data);
if (data.response) {
this.aiResponseBuffer += data.response;
if (!this.isReceiving) {
this.isReceiving = true;
this.chatHistory.push({ sender: "ai", text: "" });
}
// 마지막 AI 메시지 업데이트
this.chatHistory[this.chatHistory.length - 1].text = this.aiResponseBuffer;
}
} catch (error) {
console.error("📛 WebSocket 메시지 파싱 오류:", error, new Date().toISOString());
}
},
sendMessage() {
if (!this.socket || this.socket.readyState !== WebSocket.OPEN) {
console.error(`🚫 WebSocket 연결이 열려 있지 않습니다.- ${new Date().toISOString()}`);
return;
}
const payload = { action: "sendmessage", message: this.message };
this.chatHistory.push({ sender: "user", text: this.message });
this.socket.send(JSON.stringify(payload));
this.message = "";
this.aiResponseBuffer = "";
this.isReceiving = false;
// 🔹 새로운 메시지가 전송되었으므로 기존 ping 타이머를 초기화하고 9분 후에 새로운 ping 설정
this.resetPingTimeout();
},
resetPingTimeout() {
clearTimeout(this.pingTimeout); // 기존 타이머 제거
this.pingTimeout = setTimeout(() => {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
console.log(`🔄 5분 후 WebSocket ping 전송... - ${new Date().toISOString()}`);
this.socket.send(JSON.stringify({ action: "ping" }));
// 🔥 핑 전송 후 다시 타이머 설정 (주기적 반복)
this.resetPingTimeout();
}
}, 5 * 60 * 1000);
}
},
mounted() {
this.connectWebSocket();
},
beforeUnmount() { // ✅ Vue 3에서 사용해야 하는 lifecycle hook
if (this.socket) {
this.socket.close();
}
clearTimeout(this.reconnectTimeout);
clearTimeout(this.pingTimeout); // 🔹 ping 타이머 제거
}
};
</script>
<style scoped>
.chat-container {
width: 100%;
max-width: 1200px;
margin: 0 auto;
text-align: center;
}
.chat-box {
width: 100%;
height: 600px;
overflow-y: auto;
border: 1px solid #ddd;
padding: 10px;
background-color: #f9f9f9;
}
.user {
text-align: right;
}
.ai {
text-align: left;
}
/* ✅ 사용자 메시지 스타일 */
.user-message {
display: inline-block;
max-width: 80%;
background-color: #d9eaff;
color: black;
padding: 10px;
border-radius: 10px;
margin: 5px 0;
text-align: left;
word-wrap: break-word;
}
/* ✅ AI 메시지 스타일 */
.ai-message {
display: inline-block;
max-width: 80%;
background-color: #dff2bf;
color: black;
padding: 10px;
border-radius: 10px;
margin: 5px 0;
text-align: left;
word-wrap: break-word;
}
/* ✅ 코드 블록 스타일 */
.ai-message pre {
background-color: #282c34;
color: #abb2bf;
padding: 10px;
border-radius: 5px;
overflow-x: auto;
font-family: monospace;
}
/* ✅ 입력창 스타일 */
.input-container {
display: flex;
margin-top: 10px;
}
input {
flex: 1;
padding: 10px;
border: 1px solid #ccc;
}
button {
padding: 10px;
background-color: #4CAF50;
color: white;
border: none;
cursor: pointer;
}
</style>
`my-vue-app` > `src` > `App.vue` 부분도 수정해준다.
<template>
<div id="app">
<WebSocketComponent />
</div>
</template>
<script>
import WebSocketComponent from './components/WebSocketComponent.vue';
export default {
name: 'App',
components: {
WebSocketComponent
}
};
</script>
소수 판별 함수를 작성해달라고 요청했더니 streaming으로 응답을 받아 잘 작성해주는 모습을 보여준다.
하지만 대화 내용을 저장하고 있지 않기 때문에 무슨 대화를 하고 있었는지 알지 못하는 모습을 보인다.
5. 대화 내용을 이어지게 하기
직접 대화 내용이 이어지도록 구현할 수도 있지만, 여기서는 `langchain`의 `RunnableWithMessageHistory`를 이용하였다. 메모리에 대화 내용을 저장해놓고 자동으로 LLM에 던지는 방법이다. 이 때 대화 내용이 너무 길어지면 모델이 오류를 뱉거나 과다한 요금이 나가므로 어느 정도 자르는 걸 추천하는데 토큰 단위로 자르기, 대화 내역 요약해서 저장하기 등 다양한 방식이 있지만 list 길이로 자르도록 했다.
import json
import boto3
from langchain_aws.chat_models.bedrock_converse import ChatBedrockConverse
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import AIMessage, HumanMessage, trim_messages
from pydantic import BaseModel, Field
# In-memory chat message history implementation
class InMemoryHistory(BaseChatMessageHistory, BaseModel):
messages: list = Field(default_factory=list)
def add_message(self, message):
self.messages.append(message)
def clear(self):
self.messages = []
# Store chat histories
chat_histories = {}
def get_chat_history(session_id):
"""세션 ID에 따른 대화 기록을 가져오고, 필요하면 새로 생성"""
if session_id not in chat_histories:
chat_histories[session_id] = InMemoryHistory()
history = chat_histories[session_id]
# 대화 내역 정리 (최근 10개 메시지만 유지)
history.messages = trim_messages(
messages=history.messages,
token_counter=len, # 메시지 개수를 기준으로 조정 (토큰이 아닌 개수 기반)
max_tokens=10, # 최근 10개 대화만 유지
strategy="last", # 최신 대화 유지
start_on="human", # HumanMessage가 먼저 오도록 설정
include_system=False, # 시스템 메시지는 유지하지 않음 (필요하면 True로 변경)
allow_partial=False, # 부분적 메시지 삭제 허용 안 함
)
return history
# LangChain Bedrock 모델 설정
llm = ChatBedrockConverse(
model="amazon.nova-lite-v1:0",
temperature=0.9,
max_tokens=512,
region_name="us-east-1",
)
# RunnableWithMessageHistory 설정
def get_session_history(session_id):
return get_chat_history(session_id)
runnable = RunnableWithMessageHistory(
runnable=llm,
get_session_history=get_session_history,
)
def lambda_handler(event, context):
connection_id = event['requestContext']['connectionId']
route_key = event['requestContext']['routeKey']
session_id = event['requestContext'].get('sessionId', connection_id) # 세션 ID 설정
if route_key == '$connect':
return {'statusCode': 200, 'body': 'Connected'}
elif route_key == '$disconnect':
return {'statusCode': 200, 'body': 'Disconnected'}
elif route_key == 'sendmessage':
# 메시지 추출
if isinstance(event['body'], str):
body = json.loads(event['body'])
else:
body = event['body']
message = body.get('message', '')
# WebSocket 응답 전송 API 클라이언트 생성
api_client = boto3.client(
'apigatewaymanagementapi',
endpoint_url=f"https://{event['requestContext']['domainName']}/{event['requestContext']['stage']}"
)
try:
# 대화 기록 가져오기 및 메시지 추가
history = get_chat_history(session_id)
history.add_message(HumanMessage(content=message))
# LangChain을 사용하여 Bedrock 스트리밍 응답 받기
response = runnable.stream(
{"messages": history.messages},
config={"configurable": {"session_id": session_id}} # 세션 ID를 실제 사용
)
ai_response_content = ''
for chunk in response:
if chunk.content:
text_chunk = chunk.content[0].get('text', '')
ai_response_content += text_chunk
# WebSocket을 통해 클라이언트에 실시간 전송
api_client.post_to_connection(
ConnectionId=connection_id,
Data=json.dumps({"response": text_chunk})
)
# AI 응답을 대화 기록에 추가
history.add_message(AIMessage(content=ai_response_content))
except Exception as e:
print("Error:", str(e))
return {'statusCode': 500, 'body': 'Error processing response'}
return {'statusCode': 200, 'body': 'Message sent'}
else:
return {'statusCode': 400, 'body': 'Unhandled route'}
그러면 똑같이 소수 판별 함수를 작성해달라고 했을 때, 그 이후 대화가 이어지는 것을 확인할 수 있다.
연결 지속 시간
Amazon API Gateway 할당량 및 중요 정보 - Amazon API Gateway
최대 2시간까지 연결되며 유휴 연결 제한 시간은 10분이다.
즉, 10분 동안 비활성 상태(데이터 전송 없음)이면 자동으로 연결을 종료하며, 데이터를 지속적으로 전송했어도 2시간 후에는 연결을 무조건 종료하는 것이다.
실제로 아무것도 하지 않고 10분이 지나 다시 대화를 해보았을 때 연결이 끊겼다 다시 연결되면서 이전 대화를 기억하지 못하는 모습을 보인다.
Lambda의 제한 시간과 관계 없이 WebSocket은 계속 유지된다. 실제로 제한 시간을 30초로 잡았으나 1분 뒤 다시 대화를 해도 대화가 이어지는 것을 확인할 수 있었다.
이를 방지하기 위한 방안으로는
1) 일정 시간마다 Ping을 날린다.
이렇게 하면 유저 입장에서 10분동안 아무것도 하지 않아도 WebSocket이 끊어지지는 않는다.
setInterval(() => {
if (socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify({ action: "ping" }));
}
}, 60000); // 1분마다 ping 전송
2) DB를 활용한다.
세션이 초기화되어도 바뀌지 않는 고유 식별자 ID 등을 활용하여 session ID와 함께 DB에 저장해두고 사용한다.
세션이 종료되면 지금까지의 대화를 저장하고, 세션이 연결되면 이전 대화를 가져오도록 코드를 작성한다.
userId | connectionId | lastActiveTime | conversationHistory |
user_12345 | abc123xyz890 | 2025-03-01 12:30:50 | ... |
user_67890 | def456uvw123 | 2025-03-01 12:51:38 | ... |
예를 들어 위와 같이 DB에 저장해놓고 userId가 `user_12345`이면 이전 대화를 가져오도록 구성할 수도 있다.
AWS의 예제에서는 DynamoDB를 추천하고 있다.
다만 이 경우 대화 초기화 기능 또는 일정 시간 경과 후 DB에서 대화 내용을 삭제하는 등 스케줄링 관리도 해줘야 하는 번거로움이 있다.
(1) 유저가 2시간 이상 연속으로 사용할 서비스가 아니다.
(2) DB 비용이 부담된다.
(3) 대화 내용이나 사용 이력을 가지고 분석 등에 활용하지 않을 것이다.
위 3개 조건을 모두 만족하면 핑만 추가하는 게 좋지 않을까 싶다.
'AWS' 카테고리의 다른 글
[AWS Summit Seoul 2024] LLM의 프롬프트 엔지니어링 (1) | 2024.06.03 |
---|---|
AWS Lambda와 API Gateway를 통해 입력 받은 이미지 처리하기 (0) | 2024.05.02 |
AWS Lambda만을 이용해서 날씨알림봇 만들기 (0) | 2024.04.22 |
AWS Lambda, API Gateway로 대화형 질답봇 만들기 (Feat. 야구) (0) | 2024.04.02 |
AWS Lambda와 selenium을 이용해서 날씨알림봇 만들기 (0) | 2024.03.07 |