kafka

프로듀서 내부동작원리

25G 2023. 10. 19. 17:54

파티셔너

메시지들은 프로듀서의 send() 메소드를 통해 시리 얼라이저,파티셔너를 거쳐 카프카로 전송됩니다.

카프카의 토픽은 성능 향상을 위한 병렬 처리가 가능하도록 하기 위해 파티션으로 나뉘고, 최소 하나 또는 둘 이상의 파티션으로 구성됩니다. 그리고 프로듀서가 카프카로 전송한 메 시지는 해당 토픽 내 각 파티션의 로그 세그먼트에 저장됩니다 이때 프로듀서는 토픽으로 메시지를 보낼때 해당 토픽의 어느 파티션으로 메시지를 보내야 할지 결정해야하는데 이때 사용하는 것이 파티셔너입니다.

예를 들어서 a토픽 으로 메시지를 전송하는데 미시지 키는 user22입니다. 메시지 키값 user22는 프로듀서의 기본파티셔너를 거치면서 해시 알고리즘을 통해 a토픽의 파티션중 하나와 매핑되고 이후부터는 메시지 키값이 user22인 경우에는 항상 a토픽의 처음 매핑됐던 파티션으로 전송됩니다.
이때 만약에 파티션이 늘어난다고 해도 메시지의 키값이 할당된 파티션으로 매핑됩니다.
메시지의 키를 이용해 카프카로 메시지를 전송하는 경우 관리자의 의도와 다른 전송이 생길수 있으니 파티션수는 변경하지 않는것이 좋습니다.

라운드로빈 전략

만약에 키값이 null이 라면 기본값인 라운드 로빈 알고리즘을 사용해 프로듀서가 목적지 토픽의 파티션들로 레코드들을 랜덤 전송합니다.
하지만 예를들어 배치 전송을 위한 최소레코드수가3으로설정되어있는데, 파티션별최소레코드수의기준인3을충족하지 못한다면 프로듀서내에서 대기하고 있습니다.
옵션을 통해 일정시간마다 보낼수는 있지만 이또한 비요율적인 전송이기때문에 나온 방법이 스티키 파티셔닝 전략입니다.

스티키 파티셔닝 전략

카프카 2.4버전부터 스티키 파티셔닝 전략을 사용하게 됩니다.
스티키 파티셔닝이란 하나의 파티션에 레코드 수를 먼저 채워서 카프카로 빠르게 배치 전송하는 전략을 말합니다.
프로듀서는 키값이 null인 레코드를 보내고 파티셔너는 키값이 null인 레코드를 확인하고 배치를 위해 임의의 토픽파티션에 담아놨다가 배치를 위한 레코드수에 도달할 때까지 다른 파티션으로 보내지 않고 동일한 파티션으로 레코드를 담아놓습니다.

프로듀서의 배치

카프카에서는 토픽의 처리량을 높이기 위한 방법으로 토픽을 파티션으로 나눠 처리하며, 카프카 클라이언트인 프로듀서에서는 처리량을 높이기위해 배치 전송을 권장합니다.
프로듀서는 배치 전송을 위해 당므과 같은 옵션들을 제공합니다.

  • buffer.memory: 카프카로 메시지들을전송하기 위해 담아두는 프로듀서의 버퍼메모리 옵션.
  • batch.size: 배치 전송을 위해 메시지들을 묶는 단위를 설정하는 배치 크기옵션.
  • linger.ms: 배치전송을 위해 버퍼 메모리에서 대기하는 메시지들의 최대 시간을 설정하는 옵션입니다.

프로듀서의 배치 전송 방식은 단건의 메시지를 전송하는 것이 아니라 한번의 메시지를 묶어서 전송합니다 그로인해 불필요한 io를 줄입니다.
처리량을 높이려면 batch.size와 Unger.ms의 값을 크게 설정해야 하고, 지연 없는 전송이 목표라면 batch.size와 Unger.ms 의 값을 작게 설정해야 합니다.
여기서 주의해야 할 사항은 버퍼메모리의 크기가 충분히 커야합니다. 즉 buffer.memory의 크기는 반드시 batch.size보다 커야합니다.
그리고 배치전송과 더해서 압축 기능을 활용하면 더 효율적인 전송이 가능합니다.

중복없이 전송(멱등성 전송)

명등성이란 동일한 작업을 여러 번 수행하더라도 결과가 달라지지 않는 것을 의미합니다.

  • 적어도 한번 전송
    • 프로듀서가 메시지를 브로커에게 보내고 브로커가ACK를 보내지 않았다면 프로듀서 입장에서는 브로커가 메시지를 받았는지 아닌지 정확하게 알 방법이 없습니다. 하지만 메시지에대한 ACK를 받지 못한 프로듀서는 '적어도 한번 전송 방식'에 때라서 메시지를 다시 한번 전송합니다. 이렇게 최소한 하나의 메시지는 반드시 보장하는것이 '적어도 한번 전송'방식이며 기본적으로 카프카는 이와같이 동작합니다.
  • 최대 한 번 전송
    • 위와같이 프로듀서가 브로커에게 메시지를 보냈는데 브로커의 장애로 ACK를 보내지 못했다면 ACK를 받지 못한 프로듀서는 다음 이전 메시지를 제전송하지 않고 다음 메세지를 보냅니다. 메시지의 중복 가능성 회피를 위합입니다.
      대량의 로그 수집이나 IoT같은 환경에서 사용하곤 합니다.
  • 중복없는 전송
    • 프로듀서가 ACK를 받지 못했을때 메시지를 재전송하는것은 적어도 한 번 전송 방식과 유사하지만 중복없는 전송은 PID를 해더에 달고 데이터를 주고받습니다.그렇기때문에 브로커가 PID를 보고 해당 해더값을 가진 메시지가 한번 더오면 저장하지않고 ACK만 전송합니다.
      PID는 프로듀서에 의해 중복되지않게 자동생성됩니다.
      그리고 내부적으로만 이용되기 때문에 사용자에게 따로 노출되지 않습니다.
      하지만 중복없는 전송의 단점은 오버해드가 존재한다는 것인데 이를 최소화 하기 위해서 단순한 숫자 필드만 추가하는 방법으로 구현됐습니다.
      • 중복없는 전송을 위한 프로듀서 설정
        • enable.Idempotence:true :
          프로듀서가 중복 없는 전송을 허용할지 결정하는 옵션입니다. 기본 값은 false이므로. 이 옵션을 설정하기 원한다면 true로 변경해야 합니다. true로 변경 시 다음에 나오는 옵션들도 반드시 변경해야 합니다. 그렇지 않으면 ConflgExceptlonOI 발생합니다.
        • max.in.flight.requests. per.connection: 1~5 : ACK를 받지 않은 상태에서 하나의 커넥션에서 보낼 수 있는 최대 요청 수입니다. 기본값은 5이며. 5 이하로 설정해야 합니다.
        • acks :all : 프로듀서 acks와 관련된 옵션으로서, 기본값은 1이며 a U 로 설정해 야 합니다.
        • retries : 5: ACK를 받지 못한 경우 재시도를 해야 하므로 0보다 큰 값으로 설정 해야 합니다.

정확히 한 번 전송

은행에 송금,입금 등과같이 매우 민감한 데이터의 전송은 위의 전송방식보단 좀 더 견고한 전송방식이 필요합니다. 이럴때 카프카에서 정확히 한번 전송 방식을 처리합니다. 어플리케션에선 굉장히 구현하기가 힘들 수 밖에 없습니다.
카프카에서 정확히 한 번 전송은 트랜잭션과 같은 전체적인 프로세스 처리를 의미하며 중복없는 전송은 정확히 한번 전송의 일부 기능이라 할 수 있습니다.
카프카에서 정확히 한번 처리를 담당하는 별도의 프로세스를 트랜잭션api라고 부릅니다.

  • 디자인
    정확히 한번 방식으로 메시지를 전송할 때 프로듀서가 보내는 메시지들은 원자적으로 처리되어 전송에 성공하거나 실패하게 됩니다. 이때 트랜잭션 코디네이터 라는 것이 서버측에 존재합니다. 이 트랜잭션 코디네이터의 역할은 프로듀서에 의해 전송된 메시지를 관리하며 커밋,또는 중단 등을 표시합니다. 카프카에서는 오프셋정보를 카프카의 내부 토픽에 저장하는데 트랜잭션도 동일하게 트랜잭션 로그를 카프카 내부 토픽인 transaction_state에 저장합니다. 이 역시 토픽이므로 파티션 수와 리플리케이션 팩터수가 존재하며 브로커의 설정을 통해 관리자가 설정할 수 있습니다.
  • transaction.state.log.num.partltlons=50
  • transaction.state.log.replication.factor=3

모든정보의 로그는 트랜잭션 코디네이터가 직접 기록합니다.

  • 단개별 동작
  1. 브로커에게 FindCoordinatorRequest를 보내서 트랜잭션 코디네이터를 찾는다. 이때 트랜잭션 코디네이터의 주 역할은 PID와 transactional.id를 매핑하고 해당 트랜잭션 전체를 관리하는 것입니다.
    해당 파티션의 리더가 있는 브로커가 트랜잭션 코디네이터의 브로커로 최종 선정됩니다.
  2. 프로듀서 초기화. 프로듀서는 initTransactions()메소드를 이용해서 트랜잭션 전송을 위한 InitPidRequest를 트랜잭션 코디네이터로 보냅니다.
  3. 트랜잭션 시작 동작 프로듀서는 beginTransaction()메소드를 이용해 새로운 트랜잭션의 시작합니다. 이때 프로듀서 내부적으론 트랜잭션의 시작이지만 첫번째 레코드가 전송될 때 까지 트랜잭션이 시작된것은 아닙니다.
  4. 트랜잭션 상태 추가 동작입니다. 각 트랜잭션 상태의 내용을 기록하고 프로듀서는 토픽파티션 정보를 트랜잭션 코디네이터에게 전달하고, 코디네이터는 해당 정보를 트랜잭션 로그에 기록합니다. 기본값으로 1분동안 트랜잭션 상태 업데이트가 없다면 트랜잭션은 실패처리가 됩니다.
  5. 메시지 전송을 합니다. 이때 프로듀서는 대상 토픽의 파티션으로 메시지를 전송합니다.
  6. 트랜잭션 종료 요청 동작. 메시지전송을 완료한프로듀서는 commitTransaction()메소드 또는 abortTransaction()메소드중 하나를 반드시 호출해야 합니다.해당 메소드의 호출을 통해 트랜잭션이 완료됨을 트랜잭션 코디네이터에게 알립니다.이때 트랜잭션 코디네이터는 두단계 커밋과정을 시작하며 첫번째로 트랜잭션 로그에 해당 트랜잭션에 대한 commit을 기록합니다.
  7. 사용자 토픽에 표시하는 단계입니다. 위 첫번째 단계가 끝나면 두번째 단계로 트랜잭션 로그에 기록된 토픽의 파티션에 트랜잭션 커밋 표시를 기록합니다.
  8. 트랜잭션 완료 트랜잭션 코디네이터는 완료됨 이라고 트랜잭션 로그에 기록합니다.
    이렇게 처리된 트랜잭션을 이용하는 컨슈머는 read_committed서정을 하면 트랜잭션에 성공한 메시지들만 읽을 수 있게 됩니다.

'kafka' 카테고리의 다른 글

카프카 운영,모니터링  (1) 2023.10.19
카프카 / 컨슈머의 내부동작 원리  (0) 2023.10.19
카프카의 내부 동작 원리  (1) 2023.10.19
카프카 도입 컷트라인  (0) 2023.10.19
카프카 환경 구성  (0) 2023.10.19