LangGraph로 만든 Agent의 응답을 Streaming으로 받기
- 목차
0. 결론
for msg, metadata in graph.stream(
input,
stream_mode="messages",
):
if msg.content and metadata["langgraph_node"] == "summarizer":
text_chunk = msg.content[0].get('text', '')
print(text_chunk, end='', flush=True)
`graph.stream(input, stream_mode="messages")`를 이용하여 가장 마지막 node(위 코드에선 `summarizer`)에 대해 답변을 출력하도록 한다.
1. LangGraph의 Agent
Agent는 LLM에게 tool을 쥐어주고, 알아서 판단해서 tool을 사용할지 말지를 정하고, 적절한 파라미터를 만들어서 tool을 실행하도록 하는 기술이다.
예를 들어, 공을 $H$m 높이에서 $v$m/s의 속도로 전방을 향해 던졌을 때 날아간 거리를 계산해주는 tool(함수)가 있을 때, LLM에게 `30m 높이에서 10m/s 속도로 공을 던지면 얼마나 날아가?`라는 질문을 하게 되면 LLM은 앞서 만들어놓은 tool에 `H=30`, `v=10`을 입력하여 답으로 받고 이를 가지고 답변을 하게 된다.
▼ Bedrock - Converse API tool 기능 이용한 예제
import math
def calc_projectile_distance(H, v, g=9.81):
"""
H: 초기 높이 (m)
v: 수평 속도 (m/s)
g: 중력 가속도 (기본값 9.81 m/s²)
반환값: 수평 거리 (m)
"""
# 공이 떨어지기까지 걸리는 시간
t = math.sqrt(2 * H / g)
# 수평으로 날아간 거리
distance = v * t
return distance
import json
import boto3
def generate_text(bedrock_client, model_id, tool_config, input_text):
"""Generates text using the supplied Amazon Bedrock model. If necessary,
the function handles tool use requests and sends the result to the model.
Args:
bedrock_client: The Boto3 Bedrock runtime client.
model_id (str): The Amazon Bedrock model ID.
tool_config (dict): The tool configuration.
input_text (str): The input text.
Returns:
Nothing.
"""
# Create the initial message from the user input.
messages = [{
"role": "user",
"content": [{"text": input_text}]
}]
response = bedrock_client.converse(
modelId=model_id,
messages=messages,
toolConfig=tool_config
)
output_message = response['output']['message']
messages.append(output_message)
stop_reason = response['stopReason']
if stop_reason == 'tool_use':
# Tool use requested. Call the tool and send the result to the model.
tool_requests = response['output']['message']['content']
for tool_request in tool_requests:
if 'toolUse' in tool_request:
tool = tool_request['toolUse']
print(f"Requesting tool {tool['name']}. Paramters: {tool['input']}")
if tool['name'] == 'calc_projectile_distance':
tool_result = {}
distance = calc_projectile_distance(**tool['input'])
tool_result = {
"toolUseId": tool['toolUseId'],
"content": [{"json": {"distance": distance}}]
}
tool_result_message = {
"role": "user",
"content": [
{
"toolResult": tool_result
}
]
}
messages.append(tool_result_message)
# Send the tool result to the model.
response = bedrock_client.converse(
modelId=model_id,
messages=messages,
toolConfig=tool_config
)
output_message = response['output']['message']
# print the final response from the model.
for content in output_message['content']:
print(json.dumps(content, indent=4, ensure_ascii=False))
model_id = "amazon.nova-pro-v1:0"
tool_config = {
"tools": [
{
"toolSpec": {
"name": "calc_projectile_distance",
"description": "공을 $H$m 높이에서 $v$m/s의 속도로 전방을 향해 던졌을 때 날아간 거리 계산",
"inputSchema": {
"json": {
"type": "object",
"properties": {
"H": {
"type": "float",
"description": "공을 던지는 곳의 높이"
},
"v": {
"type": "float",
"description": "공을 던지는 속도"
},
"g": {
"type": "float",
"description": "중력가속도, 기본값=9.81"
}
},
"required": [
"H", "v"
]
}
}
}
}
]
}
bedrock_client = boto3.client(service_name='bedrock-runtime', region_name='us-east-1')
input_text = '30m 높이에서 10m/s 속도로 공을 던지면 얼마나 날아가?'
try:
print(f"Question: {input_text}")
generate_text(bedrock_client, model_id, tool_config, input_text)
except Exception as err:
print(f"A client error occured: {err}")
else:
print(
f"Finished generating text with model {model_id}.")
Question: 30m 높이에서 10m/s 속도로 공을 던지면 얼마나 날아가?
Requesting tool calc_projectile_distance. Paramters: {'v': 10.0, 'H': 30.0}
{
"text": "30m 높이에서 10m/s 속도로 공을 던지면 공은 약 24.73m를 날아갑니다."
}
Finished generating text with model amazon.nova-pro-v1:0.
이런 Agent가 여러 툴을 쥐고 답변을 수행하게 하는 것을 돕는 또 다른 도구가 바로 LangGraph이다.
LangGraph Agent 예제는 다음 링크 참고
Multi-agent supervisor
Multi-agent supervisor
Home Guides Tutorials Agent Architectures Multi-Agent Systems Multi-agent supervisor The previous example routed messages automatically based on the output of the initial researcher agent. We can also choose to use an LLM to orchestrate the different agent
langchain-ai.github.io
2. 상황 설정
회사에서 X월 X일에 생산된 OO제품에 대한 정보와 그 당시의 근무자와 그의 정보를 챗봇 형태로 파악하기를 원한다고 생각해보자. (거기에 챗봇 홍보를 위해 뉴스 기사 기반 답변도 된다고 했다고 해보자.)
그러면 필요한 tool들은 다음과 같다.
1. 생산 제품 정보
2. 근무자 정보
3. 뉴스 검색
그리고 각 tool을 사용하는 Agent를 매칭시켜준다.
생산 제품 정보 ↔ `product_informer`
근무자 정보 ↔ `worker_informer`
뉴스 검색 ↔ `news_articles_searcher`
거기에 LangGraph의 예시대로 `supervisor`를 두어 어느 Agent에게 일을 시킬지 판단하도록 하게 하며
`summarzier`를 둬서 최종 답변을 얻어내는 구조로 만들고자 한다.

3. 구현
from langgraph.graph import MessagesState
from typing import Literal
from typing_extensions import TypedDict
from langchain_aws import ChatBedrockConverse
from pydantic import BaseModel, Field
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from datetime import datetime
from duckduckgo_search import DDGS
import requests
# The agent state is the input to each node in the graph
class AgentState(MessagesState):
# The 'next' field indicates where to route to next
next: str
search_count: int
node_count: int
members = ["worker_informer", "product_informer", "news_articles_searcher"]
options = members + ["FINISH"]
system_promptA = (
"You are a supervisor tasked with managing a conversation between the"
f" following workers: {members}. Given the following user request,"
" respond with the worker to act next. Each worker will perform a"
" task and respond with their results and status. When finished,"
" respond with FINISH."
" product_informer: He can inform defect rate, worker name with (product name and product date) or (product code)."
" worker_informer: He can inform worker's team, age, etc."
f" Today is {datetime.today().strftime('%Y-%m-%d')}(KST)."
)
system_promptB = (
"You are a supervisor tasked with managing a conversation between the"
f" following workers: {members[:-1]}. Given the following user request,"
" respond with the worker to act next. Each worker will perform a"
" task and respond with their results and status. When finished,"
" respond with FINISH."
f" Today is {datetime.today().strftime('%Y-%m-%d')}(KST)."
)
# Pydantic 모델 사용
class Router(BaseModel):
"""Worker to route to next. If no workers needed, route to FINISH."""
next: Literal[*options]
llm_selector = ChatBedrockConverse(
model='us.anthropic.claude-3-5-sonnet-20241022-v2:0',
temperature=0,
max_tokens=None,
region_name='us-east-1'
)
llm = ChatBedrockConverse(
model="amazon.nova-pro-v1:0",
temperature=0.9,
max_tokens=1024,
region_name='us-east-1'
)
우선 `AgentState`를 선언하여 각 Agent가 다음 노드로 넘길 값을 갖고 있게 한다.
그리고 멤버 설정과 프롬프트는 LangGraph 예시와 거의 동일하게 가져갔다.
다만 A, B로 나눈 것은 아직까지 LLM이 완벽하지 못해 검색을 계속 반복할 때가 있어, 검색 횟수 제한을 넘으면 검색을 선택지에서 제외하기 위함이다.
추가적으로 `supervisor`에 사용할 모델은 클로드 소넷 3.5를 했으며 일반적인 답변은 아마존 노바 pro로 했다. (클로드가 좀 더 똑똑하지만 비싸기 때문)
def supervisor_node(state: AgentState) -> AgentState:
if state['node_count'] > 10:
return {"next":"summarizer"}
if state['search_count'] < 3:
messages = [SystemMessage(content=[{"type":"text", "text":system_promptA}])] + state['messages']
else:
messages = [SystemMessage(content=[{"type":"text", "text":system_promptB}])] + state['messages']
while True:
try:
structured_llm = llm_selector.with_structured_output(Router)
response = structured_llm.invoke(messages)
next_ = response.next
if next_ == "FINISH":
next_ = "summarizer"
break
except llm_selector.client.exceptions.ModelErrorException as e:
print("ModelErrorException", str(e))
continue
return {"next": next_, "search_count":state["search_count"], "node_count":state['node_count']}
노드를 왔다갔다 한 횟수가 10을 넘기거나, 검색 횟수가 3을 넘기면 안 되게끔 설정을 해주었고,
한 번씩 Bedrock 자체의 문제인지 네트워크의 문제인지 모델 에러가 나는 경우가 있어 그럴 경우를 방지하기 위해 while 반복문으로 성공할 때까지 반복하게 설정해주었다.
무엇보다 핵심은 `llm_selector.with_structed_output(Router)`다. `Router` 안에는 `options`만이 들어가있어, `options`의 요소만 선택이 된다.
def fetch_page_content(url):
"""주어진 뉴스 기사 URL에서 본문 내용을 크롤링"""
try:
headers = {"User-Agent": "Mozilla/5.0"}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status() # HTTP 오류 발생 시 예외 처리
soup = BeautifulSoup(response.text, "html.parser")
# 본문 추출 (일반적인 뉴스/블로그 구조 적용)
paragraphs = soup.find_all("p")
content = "\n".join([p.get_text() for p in paragraphs])
return content.strip()
except requests.RequestException as e:
print(f"Error fetching {url}: {e}")
return None
def search_news_articles(queries, max_results=3):
"""DuckDuckGo에서 뉴스 기사만 검색 후 URL 가져오기
parameters
----------
queries(list): 검색어 리스트. ["query1", "query2", ...]
max_results(int): 검색 결과 갯수. default=3
returns
-------
articles(string): 검색한 기사 전체
"""
articles = []
for query in queries:
for not_use_keyword in ['최근', '최신']:
query = query.replace(not_use_keyword, '').strip()
results = DDGS().news(keywords=query, max_results=max_results)
for res in results:
url = res.get("url")
if url:
article_content = fetch_page_content(url) # 본문 크롤링
title = res.get("title")
date = res.get("date")
articles.append(f"쿼리: {query}\n제목: {title} ({url})\n날짜: {date}\n\n{article_content}")
return articles
def worker_information(name: str) -> str:
"""
이름 기반으로 근무자 정보를 검색
parameters
----------
name(str): 검색할 사람 이름
returns
-------
information(str)
"""
return f"""근무자 {name}의 정보
- 생년월일: 1980.08.20
- 소속팀: 과자생산팀
- 학력: 서울고등학교-서울대학교(본교)
- 현주소: 서울시 서초구 방배동"""
def product_information(product_name: str = '', product_date: str = '', product_code: str = '') -> str:
"""
제품 정보를 조회합니다.
Parameters:
----------
product_name (str): 제품명 (예: "홈런볼")
product_date (str): 생산일 (형식: YYYYMMDD, 예: "20250301")
product_code (str): 제품 코드 (예: "bad39f")
Returns:
--------
str: 제품 불량률 및 당시 근무자 이름 정보.
주의:
- product_name과 product_date는 함께 사용해야 합니다.
- 또는 product_code 하나만 사용해도 됩니다.
- 둘 다 비어 있으면 오류가 발생합니다.
"""
print(product_name, product_date, product_code)
if product_name != '' and product_date != '':
product_date = str(product_date)
year = product_date[:4]
month = product_date[4:6]
day = product_date[6:]
return f"""{year}년 {month}월 {day}일에 생산된 {product_name}의 생산 정보
- 불량률: 0.00%
- 근무자: 김롯데"""
elif product_code != '':
return f"""{product_code}에 해당하는 제품: 2025년 01월 30일에 생산된 라면의 정보
- 불량률: 0.00%
- 근무자: 이농심"""
else:
return f"""Paramters are wrong!!! product_name: {product_name}, product_date: {product_date}, product_code: {product_code}"""
다음으로는 뉴스 기사 검색, 근무자 정보, 제품 정보 조회하는 (가짜) 함수를 만들어준다. 원래는 DB와 연결해야하지만 예시니까 대충 넘어가자. 이 때, LLM이 함수를 파악할 수 있도록 docstring을 반드시 작성해주어야 한다.
search_system_msg = f"""Search in KOREAN and query must be string.
Today is {datetime.today().strftime('%Y-%m-%d')}(KST).
Search terms should be as specific as possible.
In particular, for dates, specify a specific date. DO NOT USE RECENT/최근/최신.
"""
research_agent = create_react_agent(
llm, tools=[search_news_articles], state_modifier=search_system_msg
)
def news_search_node(state: AgentState) -> AgentState:
result = research_agent.invoke(state)
response_content = result["messages"][-1].content[0]['text']
return {
"messages": HumanMessage(content=[{"type":"text", "text": response_content}], name="news_article_searcher"),
"search_count": state['search_count']+1,
"node_count": state['node_count']+1
}
worker_informer_agent = create_react_agent(llm, tools=[worker_information])
def worker_node(state: AgentState) -> AgentState:
result = worker_informer_agent.invoke(state)
response_content = result["messages"][-1].content[0]['text']
return {
"messages": HumanMessage(content=[{"type":"text", "text": response_content}], name="worker_informer"),
"search_count": state['search_count'],
"node_count": state['node_count']+1
}
product_informer_agent = create_react_agent(llm, tools=[product_information])
def product_node(state: AgentState) -> AgentState:
result = product_informer_agent.invoke(state)
response_content = result["messages"][-1].content[0]['text']
return {
"messages": HumanMessage(content=[{"type":"text", "text": response_content}], name="product_informer"),
"search_count": state['search_count'],
"node_count": state['node_count']+1
}
tool을 사용하는 agent들을 만들어준다.
`create_react_agent`를 만들어두면 `llm`이 주어진 여러 tool 중에서 필요한 걸 선택하고 올바른 파라미터까지 생산해낸다. 다만 검색어를 만드는 건 아직 무리인 건지, 자꾸 `최신`, `최근` 같은 단어를 넣어서 검색하고 있어 해당 부분을 제외시켜주었다. (프롬프트 엔지니어링으로 설정해 놓아도 안 되는 이상한 상황)
그리고 `state`를 다음 노드로 넘길 때 `node_count`나 `search_count`의 값을 높여서 전달한다.
참고로, agent에 여러 tool을 넣어두면 필요한 tool을 알아서 선택하고, 목표를 달성할 때까지 알아서 반복하기도 한다.
예를 들어 덧셈과 곱셈 함수를 tool로 주고, 3×2+1을 하라고 하면
1. 곱셈 선택 후 파라미터로 3과 2 입력, 6을 받음
2. 덧셈 선택 후 파라미터로 6과 1 입력, 7을 받음
이런 과정을 1개의 agent 내에서 거친다.
def summary_node(state: AgentState) -> AgentState:
"""
모든 정보를 종합하여 최종 답변을 생성하는 노드
"""
messages = [
("system", "모든 정보를 종합하여 유저 질문 또는 요구사항에 대한 최종 답변을 유저에게 전달합니다.")
] + state['messages']
result = llm.invoke(messages)
return result
마지막 답변 내놓는 agent는 간단하게 작성했다.
def build_graph():
builder = StateGraph(AgentState)
builder.add_node("supervisor", supervisor_node)
builder.add_node("news_articles_searcher", news_search_node)
builder.add_node("worker_informer", worker_node)
builder.add_node("product_informer", product_node)
builder.add_node("summarizer", summary_node)
builder.add_edge(START, "supervisor")
for member in members:
# supervisor에게 작업이 완료되었음을 항상 알려주기를 원한다.
builder.add_edge(member, "supervisor")
# supervisor가 FINISH를 반환하면 summary_node로 이동하도록 설정
builder.add_conditional_edges("supervisor", lambda state: state["next"])
builder.add_edge("summarizer", END)
# 마지막으로 진입점을 추가한다.
builder.add_edge(START, "supervisor")
graph = builder.compile()
return graph
graph = build_graph()
마지막으로 node를 추가하고, node끼리 `add_edge`를 통해 연결시켜준다.
- START → superviosr, 각 멤버 → superviosr, summarizer → END
다만, `supervisor`는 여러 노드에 한 번에 이어진 것이 아니라 여러 노드 중 하나만 연결되는 조건부 연결이기 때문에 `add_conditional_edges`를 사용하여 연결시켜준다.
이후 `graph.stream(input, stream_mode="messages")`를 통해 출력을 stream으로 받을 수 있다.
(`llm`을 선언할 때 stream 기능을 넣지 않았는데도 작동하는 원리는 코드를 까보면 알 수 있지 않을까?)
node = ''
for msg, metadata in graph.stream(
{"messages":[HumanMessage(content=[{"type":"text", "text":"2025년 3월 1일에 생산된 홈런볼에 대해서, 당시 근무자의 상세 정보를 파악해줘."}])],
"search_count":0,
"node_count":0},
stream_mode="messages",
):
new_node = metadata['langgraph_node']
if node != new_node:
print('\n\n\n==================================')
print(new_node.upper(), flush=True, end='\n\n')
node = new_node
if msg.content and isinstance(msg.content, list):
if node == 'supervisor':
text_chunk = msg.content[0].get('input', '')
else:
text_chunk = msg.content[0].get('text', '')
print(text_chunk, end='', flush=True)
elif msg.content and isinstance(msg.content, str):
print(msg.content)
print('==================================')
아래는 출력 결과물이다.
==================================
SUPERVISOR
{"next": "product_informer"}
==================================
AGENT
<thinking> To retrieve the detailed information about the worker who was on duty when the product "홈런볼" was produced on 2025-03-01, I need to use the `product_information` tool. I will provide both the product name and the production date to get the required information. </thinking>
홈런볼 20250301
==================================
TOOLS
2025년 03월 01일에 생산된 홈런볼의 생산 정보
- 불량률: 0.00%
- 근무자: 김롯데
==================================
==================================
AGENT
2025년 3월 1일에 생산된 홈런볼에 대한 정보를 확인한 결과, 당시 근무자는 김롯데였으며, 불량률은 0.00%로 나타났습니다.
==================================
PRODUCT_INFORMER
2025년 3월 1일에 생산된 홈런볼에 대한 정보를 확인한 결과, 당시 근무자는 김롯데였으며, 불량률은 0.00%로 나타났습니다.
==================================
SUPERVISOR
{"next": "worker_informer"}
==================================
AGENT
<thinking> 문제의 핵심은 2025년 3월 1일에 생산된 홈런볼을 제조한 근무자인 김롯데의 상세 정보를 파악하는 것입니다. 이를 위해 worker_information 도구를 사용하여 김롯데의 상세 정보를 검색해야 합니다. </thinking>
==================================
TOOLS
근무자 김롯데의 정보
- 생년월일: 1980.08.20
- 소속팀: 과자생산팀
- 학력: 서울고등학교-서울대학교(본교)
- 현주소: 서울시 서초구 방배동
==================================
==================================
AGENT
2025년 3월 1일에 생산된 홈런볼에 대해서, 당시 근무자인 김롯데의 상세 정보는 다음과 같습니다:
- 생년월일: 1980년 8월 20일
- 소속팀: 과자생산팀
- 학력: 서울고등학교, 서울대학교(본교)
- 현주소: 서울시 서초구 방배동
==================================
WORKER_INFORMER
2025년 3월 1일에 생산된 홈런볼에 대해서, 당시 근무자인 김롯데의 상세 정보는 다음과 같습니다:
- 생년월일: 1980년 8월 20일
- 소속팀: 과자생산팀
- 학력: 서울고등학교, 서울대학교(본교)
- 현주소: 서울시 서초구 방배동
==================================
SUPERVISOR
{"next": "FINISH"}
==================================
SUMMARIZER
2025년 3월 1일에 생산된 홈런볼에 대한 정보를 종합하여 다음과 같은 상세 정보를 제공할 수 있습니다:
### 홈런볼 생산 정보 (2025년 3월 1일)
- **불량률**: 0.00%
### 당시 근무자 정보
- **이름**: 김롯데
- **생년월일**: 1980년 8월 20일
- **소속팀**: 과자생산팀
- **학력**:
- 서울고등학교
- 서울대학교(본교)
- **현주소**: 서울시 서초구 방배동
이 정보는 2025년 3월 1일에 생산된 홈런볼에 관련된 데이터를 바탕으로 작성되었습니다. 만약 더 많은 정보가 필요하거나 다른 날짜의 데이터를 확인하고 싶다면, 추가적인 조회가 필요할 수 있습니다.
1. `supervisor`가 먼저 `product_informer`를 호출한다.
2. Agent가 생각을 하여(`<thinking> ~ </thinking>`) `product_information` 툴을 이용하여 20250301의 홈런볼 정보를 받아와야한다고 판단하고, 이후 툴을 호출하여 생산 정보를 가져온다.
3. `product_informer`가 툴 결과를 가지고 `supervisor`에게 전달한다.
4. `supervisor`가 `worker_informer`를 호출한다.
5. 2~4가 반복된다.
6. 정보를 모두 찾았다고 판단한 `supervisor`가 `FINISH`로 넘긴다.
7. 마지막으로 `summarizer`가 지금까지 얻은 정보를 토대로 사용자에게 글을 작성한다.
'데이터 분석 > LLM' 카테고리의 다른 글
| MCP Python SDK로 로컬 MCP 서버 구축하기 (0) | 2025.05.10 |
|---|---|
| Windows에서 Ollama + Open WebUI 이용하기 (Docker 이용) (0) | 2025.02.28 |
| LangChain을 이용한 RAG - (6) 여러 문서 RAG 태우기 (0) | 2024.08.05 |
| LangChain을 이용한 RAG - (5) 생성 편 (0) | 2024.07.22 |
| LangChain을 이용한 RAG - (4) 검색 편 (0) | 2024.07.15 |