AI

ADK(Agent Development Kit) 스터디 2주차 (4) Runner 에 대해서 (내가 만든 ADK Agent를 API로 사용하고 싶다!)

25G 2025. 7. 18. 19:36

괜찮은 셈플 Travel Concierge 

https://github.com/google/adk-samples/tree/main/python/agents/travel-concierge

 

adk-samples/python/agents/travel-concierge at main · google/adk-samples

A collection of sample agents built with Agent Development (ADK) - google/adk-samples

github.com

위 샘플에 웬만한 케이스는 다 들어있는것 같아 파보면 실제 유스케이스에도 많이 사용할만한 내용들이 많이 담겨 있는 것 같다.


ADK 런타임

ADK 에이전트에 런타임 이라는 개념은 자동차와 엔진같은 역할을 한다.

런타임은 여러 에이전트들을 실제 사용자의 요청에 맞춰 조화롭게 작동시키는 역할을한다.

이벤트 루프

Runner와 개발자가 작성한 실행로지이 서로 대화를 주고받는 방식이다.

  • 실행로직 : 우리가 만드는 agent, tool, callback등의 코드 덩어리
  • Runner: 전체 대화의 지휘자 이다. 실행 로직을 깨우고, 실행로직이 보낸 메시지를 처리하고 상태를 저장하는 등 모든 과정을 조율한다.

실행로직과 러너의 상호작용

  1. 시작 : 러너가 사용자 요청을 받고 에이전트에게 일을 시작시킨다.
  2. 실행 및 보고 : agnet는 로직을 실행하다가 사용자에게 보여줄 메시지가 생기거나 툴을사용하거나 중요한 상태 변경이 필요할때 Evnet객체를 만들어 러너에게 보낸다. 이때 yield 라는 키워드를 사용하는데 “나 할말 있으니 잠깐 멈출게!” 이런 신호와 같다
  3. 처리 : 러너는 에이전트가 보낸 이벤트를받아서 필요한 작업을 처리한다.(db상태 저장, ui에 메시지 전송)
  4. 재개; Runner의 처리가 끝나면 멈춰있던 에이전트가 바로 그 다음 코드부터 다시 실행을 이어간다
  5. 반복: 에이전트가 현재 요청에 대해 더 이상 보고할 이벤트가 없을때 까지 설정한 만큼 과정이 반복된다.

yield (보고 후 일시정지) -> process (처리) -> resume (재개) 사이클이 바로 ADK 런타임의 핵심 동작 방식이다

실제 동작 과정 예시

사용자가 "오늘 날씨 어때? 그리고 내일 미팅 일정 알려줘" 라고 묻는 시나리오를 통해 런타임이 어떻게 동작하는지 살펴본다. 우리는 get_weather 라는 툴과 get_calendar 라는 툴을 가진 LlmAgent를 만들었다고 가정한다

1️⃣ 사용자 입력 및 러너 시작

  • User: "오늘 날씨 어때? 그리고 내일 미팅 일정 알려줘"
  • Runner: 이 메시지를 받아 Session에 기록하고, LlmAgent를 깨웁니다. agent.run_async(context) 호출!

2️⃣ 첫 번째 yield: 날씨 툴 호출

  • Agent: (LLM을 통해 문장을 분석) "아, 먼저 '오늘 날씨'를 알아봐야겠군. get_weather 툴을 써야겠다."
  • Agent: get_weather 툴을 호출하겠다는 내용의 FunctionCall 이벤트를 생성합니다.
  • Agent: 이 이벤트를 Runner에게 yield 하고 일시정지합니다. ⏸️
# 에이전트 내부 (개념 코드)
def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
    # ... LLM이 get_weather 툴을 호출하기로 결정 ...
    tool_call_event = Event(
        content=Content(parts=[Part(function_call={'name': 'get_weather', 'args': {'location': 'Busan'}})])
    )
    
    # 1. 러너에게 '툴 호출할게!'라고 보고하고 일시정지
    yield tool_call_event
    # <<<<<<<<<<<< 여기서 실행이 멈춤 >>>>>>>>>>>>
    
    # ... 런타임 처리 후 여기서부터 다시 시작됨 ...

3️⃣ 러너 처리 및 에이전트 재개

  • Runner: tool_call_event를 받습니다. "에이전트가 get_weather 툴을 쓰려고 하네" 라고 기록합니다.
  • Runner: 처리가 끝났으니 Agent를 다시 깨웁니다.
  • Agent: 실행 재개! ✅ 이제 멈췄던 yield 문 바로 다음 줄부터 코드를 실행합니다.

4️⃣ 두 번째 yield: 날씨 툴 실행 결과 보고

  • Agent: 이제 진짜로 get_weather 툴을 실행합니다. 툴이 "부산: 맑음, 28도" 라는 결과를 반환합니다.
  • Agent: 이 툴 실행 결과를 FunctionResponse 이벤트에 담아 Runner에게 yield 하고 다시 일시정지합니다. ⏸️
# 에이전트 내부 (개념 코드, 이전 코드에 이어서)

# 2. 러너가 깨워줘서 다시 시작. 이제 진짜 툴 실행
tool_result = await get_weather(location='Busan') # 결과: {'weather': '맑음, 28도'}

tool_response_event = Event(
    content=Content(parts=[Part(function_response={'name': 'get_weather', 'response': tool_result})])
)

# 3. 러너에게 '툴 실행 결과는 이거야!'라고 보고하고 일시정지
yield tool_response_event
# <<<<<<<<<<<< 여기서 다시 실행이 멈춤 >>>>>>>>>>>>

5️⃣ 캘린더 툴 호출 및 결과 보고 (위 과정 반복)

  • 위 2~4번 과정이 get_calendar 툴에 대해서도 똑같이 반복됩니다. 에이전트는 get_calendar 툴 호출 이벤트를 yield하고, Runner가 처리 후 깨워주면 툴을 실행하고, 그 결과를 다시 yield 합니다.

6️⃣ 마지막 yield: 최종 답변 생성 및 보고

  • Agent: 이제 '날씨 정보'와 '미팅 정보'를 모두 얻었습니다.
  • Agent: 이 정보들을 종합하여 LLM을 통해 최종 답변을 만듭니다. "오늘 부산 날씨는 맑고 28도이며, 내일은 오후 3시에 '주간 보고' 미팅이 있습니다."
  • Agent: 이 최종 답변을 Text가 담긴 Event로 만들어 Runner에게 yield 하고 일시정지합니다. ⏸️

7️⃣ 러너 처리 및 종료

  • Runner: 최종 답변 이벤트를 받아 사용자 UI에 보여줍니다.
  • Runner: Agent가 더 이상 yield할 이벤트가 없다는 것을 확인하고 이번 요청에 대한 처리를 완전히 종료합니다.🏁

런타임 특징

상태 업데이트는 yield 이후에 확정된다.

에이전트 코드 안에서context.session.state['my_key'] = 'my_value' 처럼 상태를 변경해도, 이 변경 사항은 로컬에만 임시로 기록됨

이 번경 사항이 실제 저장소에 영구 저장되는 시점은 Event를 yield하고 Runner가 그 이벤트를 성공적으로 처리한 이후

# 에이전트 로직 내부

# 1. 상태를 로컬에서 변경
ctx.session.state['status'] = 'processing' 
event_with_state = Event(
    # 이 변경사항을 actions에 담아줘야 함
    actions=EventActions(state_delta={'status': 'processing'}),
    content=Content(parts=[Part(text="처리 중입니다...")])
)

# 2. 이벤트를 yield 함 -> 실행 일시정지
yield event_with_state
# --- PAUSE --- 
# 이 시간 동안 Runner가 SessionService를 통해 'status' = 'processing'을 DB에 저장합니다.
# --- RESUME ---

# 3. 실행 재개. 이제 이 상태는 영구 저장된 것이 보장됨
current_status = ctx.session.state['status'] 
# current_status는 이제 확실하게 'processing' 입니다.
print(f"재개 후 상태: {current_status}")

이 규칙 덕분에 항상 일관성 있는 상태 값을 기반으로 로직을 짤 수 있다.

스트리밍

LLM이 답변을 실시간으로 생성할때 ADK는 여러개의 Event를 yield한다.

  • 중간이벤트 : 러너는 이 이벤트를 받으면 UI에 텍스트를 바로바로 보여주기 위해 전달은 하지만 이벤트에 포함된 상태변경 같은 엑션은 처리하지않고 건너뛴다.
  • 최종 이벤트 : 스트리밍이 끝나고 완전한 응답이 담긴 마지막 이벤트가 오면 러너는 이제서야 이 이벤트에 담긴 액션을 처리해서 상태를 딱 한번만 정확하게 저장한다.

비동기가 디폴트

비동기 환경에 맞춰서 설계하는게 중요


API콜을 위한 세션을 어떻게 만들지?

다음 예제 코드를 보면 adk api server를 호출할때 필요한 세션 id를 어떻게 획득하는지를 알 수 있다.

# 세션 획득
async def start_agent_session(user_id, is_audio=False):
    """Starts an agent session"""

    # Create a Runner
    runner = InMemoryRunner(
        app_name=APP_NAME,
        agent=root_agent,
    )

    # Create a Session
    session = await runner.session_service.create_session(
        app_name=APP_NAME,
        user_id=user_id,  # Replace with actual user ID
    )

    # Set response modality
    modality = "AUDIO" if is_audio else "TEXT"
    run_config = RunConfig(response_modalities=[modality])

    # Create a LiveRequestQueue for this session
    live_request_queue = LiveRequestQueue()

    # Start agent session
    live_events = runner.run_live(
        session=session,
        live_request_queue=live_request_queue,
        run_config=run_config,
    )
    return live_events, live_request_queue
    
    
# 세션 사용 예제
@app.get("/events/{user_id}")
async def sse_endpoint(user_id: int, is_audio: str = "false"):
    """SSE endpoint for agent to client communication"""

    # Start agent session
    user_id_str = str(user_id)
    live_events, live_request_queue = await start_agent_session(user_id_str, is_audio == "true")

    # Store the request queue for this user
    active_sessions[user_id_str] = live_request_queue

    print(f"Client #{user_id} connected via SSE, audio mode: {is_audio}")

    def cleanup():
        live_request_queue.close()
        if user_id_str in active_sessions:
            del active_sessions[user_id_str]
        print(f"Client #{user_id} disconnected from SSE")

    async def event_generator():
        try:
            async for data in agent_to_client_sse(live_events):
                yield data ## 러너 활용
        except Exception as e:
            print(f"Error in SSE stream: {e}")
        finally:
            cleanup()

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "Access-Control-Allow-Origin": "*",
            "Access-Control-Allow-Headers": "Cache-Control"
        }
    )

 

이제 내가 만든 에이전트를 어떻게 하면 API로 활용할 수 있을지 윤곽이 나오는 것 같다. 근데 작업하다보니 user개념이 있어야 최종적으로 API 를 활용할 수 있을것 같다...