CQRS

CQRS를 NestJs에서 사용예시

25G 2023. 7. 21. 11:38

doc

CQRS

단순한 CRUD (Create, Read, Update, Delete) 애플리케이션의 흐름은 다음과 같이 설명할 수 있습니다

컨트롤러 계층은 HTTP 요청을 처리하고 작업을 서비스 계층에 위임합니다.
서비스 계층은 대부분의 비즈니스 로직이 담겨있는 곳입니다.
서비스는 레포지토리/DAO를 사용하여 엔티티를 변경하거나 영속화합니다.
엔티티는 값을 담는 컨테이너로서, 세터와 게터를 갖고 있습니다.
이러한 패턴은 일반적으로 소규모 및 중규모 애플리케이션에는 충분하지만, 더 크고 복잡한 애플리케이션에는 최적의 선택이 아닐 수 있습니다. 이런 경우 CQRS(Command and Query Responsibility Segregation) 모델이 더 적합하고 확장 가능할 수 있습니다(애플리케이션의 요구사항에 따라 다릅니다). 이 모델의 장점은 다음과 같습니다:

  1. 역할 분리 모델은 읽기와 쓰기 작업을 별도의 모델로 분리합니다.
  2. 확장성 읽기와 쓰기 작업을 독립적으로 확장할 수 있습니다.
  3. 유연성 모델은 읽기와 쓰기 작업에 각각 다른 데이터 저장소를 사용할 수 있습니다.
  4. 성능 모델은 읽기와 쓰기 작업에 최적화된 다른 데이터 저장소를 사용할 수 있습니다.
    이 모델을 용이하게 사용하기 위해 Nest는 가벼운 CQRS 모듈을 제공합니다
npm install --save @nestjs/cqrs

커맨드(Command)

커맨드는 애플리케이션 상태를 변경하는 데 사용됩니다. 데이터 중심보다는 작업 중심으로 구성되어야 합니다. 커맨드가 디스패치되면 해당하는 커맨드 핸들러(Command Handler)가 처리합니다. 핸들러는 애플리케이션 상태를 업데이트하는 역할을 담당합니다.

@Injectable()
export class HeroesGameService {
  constructor(private commandBus: CommandBus) {}

  async killDragon(heroId: string, killDragonDto: KillDragonDto) {
    return this.commandBus.execute(
      new KillDragonCommand(heroId, killDragonDto.dragonId)
    );
  }
}

KillDragonCommand 클래스를 인스턴스화하고 해당 인스턴스를 CommandBus의 execute() 메서드에 전달합니다.

export class KillDragonCommand {
  constructor(
    public readonly heroId: string,
    public readonly dragonId: string,
  ) {}
}

CommandBus는 커맨드 스트림을 나타냅니다. 이는 커맨드를 적절한 핸들러에게 디스패치하는 역할을 담당합니다. execute() 메서드는 핸들러가 반환하는 값으로 해결되는 프로미스를 반환합니다.

@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
  constructor(private repository: HeroRepository) {}

  async execute(command: KillDragonCommand) {
    const { heroId, dragonId } = command;
    const hero = this.repository.findOneById(+heroId);

    hero.killEnemy(dragonId);
    await this.repository.persist(hero);
  }
}

이 핸들러는 레포지토리에서 Hero 엔티티를 검색하고, killEnemy() 메서드를 호출한 후 변경 사항을 영속화합니다. KillDragonHandler 클래스는 ICommandHandler 인터페이스를 구현합니다. 이 인터페이스는 execute() 메서드의 구현을 필요로 합니다. execute() 메서드는 커맨드 객체를 인수로 받습니다.

다음은 KillDragonHandler 클래스의 예시 구현입니다:

import { ICommandHandler, CommandHandler } from 'nest-commands';
import { KillDragonCommand } from './kill-dragon.command';
import { HeroesRepository } from './heroes.repository';

@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
  constructor(private readonly heroesRepository: HeroesRepository) {}

  async execute(command: KillDragonCommand) {
    const { heroId, dragonId } = command;
    const hero = await this.heroesRepository.findOneById(heroId);

    if (!hero) {
      throw new Error('Hero not found');
    }

    // Call the killEnemy() method on the hero entity
    hero.killEnemy(dragonId);

    // Persist the changes to the repository
    await this.heroesRepository.save(hero);
  }
}

위 코드에서는 KillDragonHandler 클래스가 ICommandHandler 인터페이스를 구현하고, execute() 메서드를 구현하였습니다. execute() 메서드는 KillDragonCommand 객체를 인수로 받고, 해당 커맨드에 따른 로직을 수행하여 Hero 엔티티를 검색하고 적절한 작업을 수행한 후 변경 사항을 영속화합니다. 이를 통해 KillDragonCommand에 대한 처리가 이루어집니다.

쿼리(Query)

쿼리는 애플리케이션 상태로부터 데이터를 검색하는 데 사용됩니다. 데이터 중심적인 작업보다는 태스크(작업) 중심적이어야 합니다. 쿼리가 디스패치되면 해당하는 쿼리 핸들러(Query Handler)가 처리합니다. 핸들러는 데이터를 검색하는 역할을 담당합니다.

QueryBus는 CommandBus와 동일한 패턴을 따릅니다. 쿼리 핸들러는 IQueryHandler 인터페이스를 구현하고 @QueryHandler() 데코레이터로 주석을 달아야 합니다.

다음은 쿼리 핸들러의 예시 구현입니다:

import { IQueryHandler, QueryHandler } from 'nest-queries';
import { GetHeroQuery } from './get-hero.query';
import { HeroesRepository } from './heroes.repository';

@QueryHandler(GetHeroQuery)
export class GetHeroHandler implements IQueryHandler<GetHeroQuery> {
  constructor(private readonly heroesRepository: HeroesRepository) {}

  async execute(query: GetHeroQuery) {
    const { heroId } = query;
    return this.heroesRepository.findOneById(heroId);
  }
}

위 코드에서는 GetHeroHandler 클래스가 IQueryHandler 인터페이스를 구현하고, execute() 메서드를 구현하였습니다. execute() 메서드는 GetHeroQuery 객체를 인수로 받고, 해당 쿼리에 따른 로직을 수행하여 Hero 엔티티를 검색하고 반환합니다. 이를 통해 GetHeroQuery에 대한 처리가 이루어집니다.

이벤트(Event)

이벤트는 애플리케이션 상태의 변경 사항을 다른 부분에 알리는 데 사용됩니다. 이벤트는 모델에 의해 디스패치되거나 EventBus를 통해 직접 디스패치될 수 있습니다. 이벤트가 디스패치되면 해당하는 이벤트 핸들러(Event Handler)가 처리합니다. 핸들러는 예를 들어, 읽기 모델을 업데이트하는 역할을 수행할 수 있습니다.

데모를 위해 이벤트 클래스를 생성해봅시다:

export class DragonKilledEvent {
  constructor(public readonly heroId: string, public readonly dragonId: string) {}
}

위의 코드에서는 DragonKilledEvent라는 이벤트 클래스를 생성했습니다. 이 클래스는 heroId와 dragonId라는 두 개의 읽기 전용 속성을 가지고 있습니다. 이벤트는 애플리케이션 상태에서 드래곤이 사냥되었을 때 발생할 수 있는 상황을 나타내는데 사용될 수 있습니다.

이제 이벤트가 발생하는 시점에서 이 클래스를 활용하여 이벤트를 생성하고, 이벤트 버스를 통해 이벤트를 디스패치하는 코드를 구현할 수 있습니다. 이렇게 생성된 이벤트는 다른 부분에 알리거나 처리를 위해 이벤트 핸들러에서 활용될 수 있습니다.

이제 이벤트는 EventBus.publish() 메서드를 사용하여 직접 디스패치할 수도 있지만, 모델에서도 이벤트를 디스패치할 수 있습니다. Hero 모델을 업데이트하여 killEnemy() 메서드가 호출될 때 HeroKilledDragonEvent 이벤트를 디스패치하도록 해보겠습니다.

hero.model.tsJS

export class Hero extends AggregateRoot {
  constructor(private id: string) {
    super();
  }

  killEnemy(enemyId: string) {
    // 비즈니스 로직
    this.apply(new HeroKilledDragonEvent(this.id, enemyId));
  }
}

위의 코드에서는 Hero 모델에 killEnemy() 메서드를 추가하였습니다. 이 메서드 내부에서는 apply() 메서드를 사용하여 이벤트를 디스패치합니다. apply() 메서드는 이벤트 객체를 인수로 받습니다. 그러나 모델은 EventBus를 인식하지 못하므로, EventPublisher 클래스와 연결해야 합니다.

kill-dragon.handler.tsJS

@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
  constructor(
    private repository: HeroRepository,
    private publisher: EventPublisher,
  ) {}

  async execute(command: KillDragonCommand) {
    const { heroId, dragonId } = command;
    const hero = this.publisher.mergeObjectContext(
      await this.repository.findOneById(+heroId),
    );
    hero.killEnemy(dragonId);
    hero.commit();
  }
}

위의 코드에서는 KillDragonHandler 클래스에서 EventPublisher를 사용하여 이벤트를 디스패치할 수 있도록 합니다. EventPublisher#mergeObjectContext 메서드는 이벤트 발행자를 제공된 객체와 병합합니다. 이렇게 함으로써 객체는 이제 이벤트를 이벤트 스트림에 디스패치할 수 있게 됩니다.

이 예시에서 commit() 메서드도 호출하고 있습니다. 이 메서드는 미처 디스패치하지 않은 이벤트를 디스패치하는데 사용됩니다. 이벤트를 자동으로 디스패치하려면 autoCommit 속성을 true로 설정할 수 있습니다.

export class Hero extends AggregateRoot {
  constructor(private id: string) {
    super();
    this.autoCommit = true;
  }
}

이제 Hero 인스턴스는 자동으로 이벤트를 디스패치하게 됩니다. HeroModel처럼 클래스로 사용되는 객체에도 EventPublisher#mergeClassContext 메서드를 사용하여 이벤트 발행자를 병합할 수 있습니다.

추가로, EventBus를 사용하여 이벤트를 수동으로 발행할 수도 있습니다.

this.eventBus.publish(new HeroKilledDragonEvent());

이를 통해 이벤트는 적절한 방식으로 디스패치되고 처리될 수 있습니다.

@EventsHandler(HeroKilledDragonEvent)
export class HeroKilledDragonHandler implements IEventHandler<HeroKilledDragonEvent> {
  constructor(private repository: HeroRepository) {}

  handle(event: HeroKilledDragonEvent) {
    // 비즈니스 로직
  }
}

위의 코드는 HeroKilledDragonEvent 이벤트를 처리하는 이벤트 핸들러를 정의한 것입니다. EventsHandler 데코레이터는 HeroKilledDragonEvent 이벤트에 대해 해당 핸들러를 등록합니다. 이벤트 핸들러는 IEventHandler 인터페이스를 구현해야 하며, handle() 메서드를 구현해야 합니다. handle() 메서드는 HeroKilledDragonEvent 이벤트를 인수로 받아 해당 이벤트에 대한 비즈니스 로직을 수행합니다.

이렇게 등록된 이벤트 핸들러는 이벤트가 발생할 때마다 해당 이벤트에 대해 실행됩니다. 따라서 HeroKilledDragonHandler에서 정의한 비즈니스 로직은 HeroKilledDragonEvent 이벤트가 발생할 때마다 실행되며, 이벤트 핸들러는 이벤트를 처리하는 역할을 담당합니다.

주의할 점으로, 이벤트 핸들러를 사용하면 전통적인 HTTP 웹 컨텍스트에서 벗어납니다. CommandHandler에서 발생하는 오류는 여전히 내장된 예외 필터(Exception filters)에서 잡힐 수 있지만, EventHandler에서 발생하는 오류는 예외 필터에서 잡히지 않습니다. 따라서 이벤트 핸들러 내에서 오류 처리를 직접 해주어야 합니다. try/catch를 사용하거나, 보상 이벤트를 트리거하여 Saga를 사용하거나, 선택한 다른 해결책을 활용하여 이벤트 핸들러의 오류를 처리해야 합니다.

또한, CommandHandler에서는 HTTP 응답을 클라이언트로 보낼 수 있지만, EventHandler에서는 그렇지 않습니다. 만약 클라이언트로 정보를 전송하고 싶다면 WebSocket, SSE 등 다른 방법을 사용해야 합니다.

사가(Saga)

사가는 이벤트를 수신하고 새로운 커맨드를 트리거할 수 있는 장기 실행 프로세스입니다. 일반적으로 애플리케이션에서 복잡한 워크플로우를 관리하는 데 사용됩니다. 예를 들어, 사용자가 가입하면 사가는 UserRegisteredEvent를 수신하고 사용자에게 환영 이메일을 보낼 수 있습니다.

사가는 굉장히 강력한 기능입니다. 단일 사가는 1개 이상의 이벤트를 수신할 수 있습니다. RxJS 라이브러리를 사용하여 이벤트 스트림을 필터링, 매핑, 포크, 병합하여 복잡한 워크플로우를 생성할 수 있습니다. 각 사가는 커맨드 인스턴스를 생성하는 Observable을 반환합니다. 이 커맨드는 CommandBus에 의해 비동기적으로 디스패치됩니다.

이제 HeroKilledDragonEvent를 수신하고 DropAncientItemCommand 커맨드를 디스패치하는 사가를 생성해봅시다.

@Injectable()
export class HeroesGameSagas {
  @Saga()
  dragonKilled = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(HeroKilledDragonEvent),
      map((event) => new DropAncientItemCommand(event.heroId, fakeItemID)),
    );
  }
}

위의 코드에서는 HeroesGameSagas 클래스에 dragonKilled 메서드를 정의하여 사가를 생성하였습니다. @Saga() 데코레이터는 해당 메서드를 사가로 표시합니다. events$ 인수는 모든 이벤트의 Observable 스트림입니다. ofType 연산자는 지정된 이벤트 타입으로 스트림을 필터링합니다. map 연산자는 이벤트를 새로운 커맨드 인스턴스로 매핑합니다.

이 예시에서는 HeroKilledDragonEvent를 DropAncientItemCommand 커맨드로 매핑하고 있습니다. 그런 다음 DropAncientItemCommand 커맨드는 CommandBus에 의해 자동으로 디스패치됩니다.

주의할 점으로, ofType 연산자와 @Saga() 데코레이터는 @nestjs/cqrs 패키지에서 내보내지는 것을 확인해야 합니다. 또한, @Saga() 데코레이터는 메서드를 사가로 표시하는 역할을 하며, events$ 인수는 모든 이벤트의 Observable 스트림입니다. ofType 연산자는 스트림을 지정된 이벤트 타입으로 필터링하고, map 연산자는 이벤트를 새로운 커맨드 인스턴스로 매핑합니다. 이를 통해 사가가 원하는 이벤트를 수신하고 새로운 커맨드를 트리거할 수 있습니다.

마무리로, 모든 커맨드 핸들러, 이벤트 핸들러 및 사가를 HeroesGameModule에 등록해야 합니다.

export const CommandHandlers = [KillDragonHandler, DropAncientItemHandler];
export const EventHandlers =  [HeroKilledDragonHandler, HeroFoundItemHandler];

@Module({
  imports: [CqrsModule],
  controllers: [HeroesGameController],
  providers: [
    HeroesGameService,
    HeroesGameSagas,
    ...CommandHandlers,
    ...EventHandlers,
    HeroRepository,
  ]
})
export class HeroesGameModule {}

위의 코드에서는 HeroesGameModule에 CommandHandlers와 EventHandlers를 등록하고 있습니다. CommandHandlers는 KillDragonHandler와 DropAncientItemHandler를, EventHandlers는 HeroKilledDragonHandler와 HeroFoundItemHandler를 포함하도록 설정하였습니다.

CqrsModule를 import하여 CQRS 기능을 활성화하고, HeroesGameController를 컨트롤러로 등록하였습니다. 또한, HeroesGameService, HeroesGameSagas, HeroRepository 및 앞서 등록한 CommandHandlers와 EventHandlers를 providers로 제공하도록 설정하였습니다.

이렇게 모든 핸들러와 사가를 모듈에 등록함으로써 CQRS 기능을 활용할 준비가 완료되었습니다. 애플리케이션은 이제 커맨드, 이벤트, 사가를 관리하고 알맞게 처리할 수 있을 것입니다.

이벤트 핸들러는 비동기적으로 실행되기 때문에 모든 예외를 항상 처리해야 하며, 응용 프로그램이 일관성 없는 상태에 들어가지 않도록 해야 합니다. 그러나 예외가 처리되지 않으면 EventBus는 UnhandledExceptionInfo 객체를 생성하고 UnhandledExceptionBus 스트림에 푸시합니다. 이 스트림은 처리되지 않은 예외를 처리하는 데 사용할 수 있는 Observable입니다.

private destroy$ = new Subject<void>();

constructor(private unhandledExceptionsBus: UnhandledExceptionBus) {
  this.unhandledExceptionsBus
    .pipe(takeUntil(this.destroy$))
    .subscribe((exceptionInfo) => {
      // 예외 처리 작업 수행
      // 예를 들어 외부 서비스로 보내기, 프로세스 종료, 또는 새로운 이벤트 발행 등
    });
}

onModuleDestroy() {
  this.destroy$.next();
  this.destroy$.complete();
}

위의 코드에서는 UnhandledExceptionBus를 사용하여 처리되지 않은 예외를 구독하고 있습니다. takeUntil 연산자를 사용하여 컴포넌트 파괴 시점까지 구독을 유지하고, 해당 스트림을 구독하는 것을 중지하도록 설정하였습니다.

예외를 필터링하려면 ofType 연산자를 사용할 수 있습니다.

this.unhandledExceptionsBus.pipe(takeUntil(this.destroy$), UnhandledExceptionBus.ofType(TransactionNotAllowedException)).subscribe((exceptionInfo) => {
  // 예외 처리 작업 수행
});

위의 코드에서는 TransactionNotAllowedException이라는 예외를 필터링하도록 설정하였습니다.

UnhandledExceptionInfo 객체는 다음과 같은 속성을 포함합니다.

export interface UnhandledExceptionInfo<Cause = IEvent | ICommand, Exception = any> {
  /**
   * The exception that was thrown.
   */
  exception: Exception;
  /**
   * The cause of the exception (event or command reference).
   */
  cause: Cause;
}

UnhandledExceptionInfo 객체는 발생한 예외(exception)와 해당 예외의 원인인 이벤트 또는 커맨드 (cause)를 담고 있습니다. 이를 통해 처리되지 않은 예외에 대한 정보를 효과적으로 처리할 수 있습니다.

예를 들어, CommandBus, QueryBus 및 EventBus는 모두 Observable이기 때문에 전체 스트림에 구독(subscribe)할 수 있습니다. 이를 통해 모든 이벤트를 처리할 수 있습니다. 예를 들어, 모든 이벤트를 콘솔에 로그로 출력하거나, 이벤트 스토어에 저장하는 등의 작업을 수행할 수 있습니다.

private destroy$ = new Subject<void>();

constructor(private eventBus: EventBus) {
  this.eventBus
    .pipe(takeUntil(this.destroy$))
    .subscribe((event) => {
      // 이벤트를 데이터베이스에 저장하는 작업 수행
    });
}

onModuleDestroy() {
  this.destroy$.next();
  this.destroy$.complete();
}

위의 코드에서는 EventBus를 사용하여 모든 이벤트를 구독하고 있습니다. takeUntil 연산자를 사용하여 컴포넌트 파괴 시점까지 구독을 유지하고, 해당 스트림을 구독하는 것을 중지하도록 설정하였습니다.

subscribe() 메서드의 콜백 함수에서는 이벤트를 처리하는 작업을 수행합니다. 이 예시에서는 모든 이벤트를 데이터베이스에 저장하고 있습니다. 따라서 이 코드를 통해 발생한 모든 이벤트를 효과적으로 로깅하거나 이벤트 스토어에 저장할 수 있습니다.

주의할 점으로, 컴포넌트가 파괴될 때 구독을 취소하고 완료(complete)해주어야 메모리 누수를 방지합니다. 이를 위해 onModuleDestroy() 메서드에서 destroy$ 스트림을 완료하고 해제해주었습니다. 이로써 올바르게 구독과 구독 해제를 관리할 수 있습니다.

'CQRS' 카테고리의 다른 글

CQRS패턴 사용예시  (0) 2023.07.21
CQRS패턴  (0) 2023.07.21