Airflow

airflow개념 및 설치와 mysql 연동

25G 2022. 10. 31. 13:12

Airflow?

복잡한 워크플로우를 프로그래밍 방식으로 작성해서 스케줄링하고 모니터링 할 수 있는 플랫폼
어떠한 스크립트들을 스케줄링 할때 crontab, cloudwatch 등을 사용하는데 스크립트끼리 서로의 의존성이 생기게 되면 컨트롤하기 어렵고 문제가 생겼을 시에 디버깅을 확인하고 어디서 잘못됐는지 체킹하는 작업들이 필요해진다.
Airflow에서는 서로에 대한 의존성을 표현할수 있고 스크립트가 실패했을때 알람을 보내 확인하고 쉽게 수정 후 재시도 할 수 있고, 이전 날짜 작업이 실패했을때 그 날짜만 다시 실행하는 등 많은 문제를 해결할 수 있다.

  • 실패한 테스크만 재수행 요청을 할 수 있고 이를 ui로 간단하게 실행할 수 있다.
  • dag arg 에 retry를 설정해서 계속해서 제시도할 수도 있다.'
  • 웹 ui을통해 직관적으로 워크플로우를 관리할수 있다.
  • dag끼리 연결해서 거대한 워크플로우를 만들수도 있다.

Airflow use case

크게 세가지 카테고리를 나눠서 생각할 수 있습니다.

  • ETL
    • 데이터 처리흐름이 복잡한경우에 어디서 어디까지 진행하고 있고 어디서 문제가발생했는지 체크하는데 사용
  • 머신러닝시에 트레이닝을위해서 데이터전처리작업을 위해서 사용
  • devOps가 에어플로우를 사용하는 메타 디비를 관리하고 일정의 스케줄링작업을 관리하는데 사용

Airflow 구조

  1. Scheduler
  • 작업 스케줄링
  1. worker
  • task 를 수행하는 주체
  1. webServer
  • airflow의 ui 인터페이스 제공 dag의 성공여부등을 확인하고 테스크 워크플로우를 시각적으로 제공
  1. meta database
  • 시스템의 상태를 저장을하고 과거에실행했던 히스토리를 저장할 수 있는 기능제공

Airflow challenges

airflow에대한 체크할 사항들
1. 환경 셋업이 시간과 노력이 많이 들어간다. 그렇기때문에 스크립트로 만들어서 사용하기도함
2. 워커의 스케일링을 하기위해 계속 관리를 해줘야한다.
3. 시스템 보안관리가 필요하다.
4. 수만은 디펜던시에대한 보안이나 이슈를 관리하고 학습하는데 걸리는시간
5. maintenance를 지속적으로 해줘야한다.

amaazon MWAA 사용시 이점

  1. 환경 셋업시에 기존 airflow와 같은 의존성을 사용하기때문에 쉽게 환경셋업 가능
  2. 워커의 스케일링시 부하가 심해지면 자동 스케일링 제공, MWAA는 ECS 에 aws fargate위에서 실행된다
  3. 인증 처리관리해서 AWS IAM와 완전 통합되어있음
  4. 클라우드와치를 통해서 연동하여 여러가지 관리 편의성 제공

개념

DAG

DAG는 유향 비순환 그래프라고 하며 에어플로우의 워크플로우는 python을 사용하여 작성할 수 있다. 하나의 DAG안에는 한개 이상의 TASK가 있으며, Task는 실제 실행시키는 작업

  • DAG 파일 = 워크플로우
  • python 스크립트로 작성
  • Operator로 task 정의
  • DAG 등록
    • dag을 dags_folder에 저장
    • DAG task 구성 예시

      task의 순서는 간한단 연산자로 표시해 줘도 된다.
      c1테스크를 수행한후에 t1,t2를 동시에 실행하게 되고 t2를 수행한 다음 t3를 수행한다 t1,t3를 모두 수행된 후에는 stop_op task를 수행하게 된다.

      이렇게 테스크에대한 워크플로우를 airflow web ui에서 볼수 있다.

Operator

에어플로우는 Operator를 사용하여 python, bash, aws, slack 등 다양한 동작을 실행시킬 수 있다. 예를 들어 BashOperator를 사용하면 bash 스크립트를 실행시킬 수 있게 되는것입니다.

  • airflow는 Operator를 이용해 Task를정의
  • 다양한 Operator가 있다.
  • 각 Operator마다 수행하는 주체, 목적이 다르다.

Task 연결

  • 각 task는 ">>","<<","[]"를 이용해여 dag그래프를 그릴수 있다ㅏ.

arg 설정및 DAG 객체 생성

  • default_args
    • DAG를 구성할 때 필요한 default argument를 세팅
  • DAG 객체 생성
    • DAG 이름과 schedule interval 등을 세팅
    • 실제 dag를 수행하는 작업을 명시
    • operator, task 연결 등을 이안에서 수행

Executor

작업 실행을 시켜주는 실행기, Executor의 종류를 실행방법에 따라 선택해서 사용하면 된다.

Airflow 설치

Airflow는 기본값으로 sqlite를 사용하며 이 경우 Executor는 SequentialExcutor를 사용하게 된다.

그리고 저는 파이썬 가상환경에 설치할 것이기 때문에 다음명령어로 가상환경 설치및 실행 해서 작업을 진행했습니다. (참고)

pip3 install virtualenv

python3 -m virtualenv airflow // venv파일 생성
source ./airflow/bin/activate // 가상환경 실행

참고 사이트

# Airflow needs a home. `~/airflow` is the default, but you can put it
# somewhere else if you prefer (optional)
export AIRFLOW_HOME=~/airflow

# Install Airflow using the constraints file
AIRFLOW_VERSION=2.4.1
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# For example: 3.7
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.4.1/constraints-3.7.txt
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# The Standalone command will initialise the database, make a user,
# and start all components for you.
airflow standalone

# Visit localhost:8080 in the browser and use the admin account details
# shown on the terminal to login.
# Enable the example_bash_operator dag in the home page

순서대로 명령어 실행하면됩니다.

참고로 저는 export가 아닌 직접 쉘에서 사용하는 환경변수 파일을 수정해서 등록했습니다.

  • 참고
  1. wheel 명령어가 없으면 설치되지 않습니다.
pip install wheel

2. 환경변수를 venv환경이 아닌 시스템에 등록을 해야합니다.

airflow mysql 연동 설정

  • $AIRFLOW_HOME 에(defualt는 ~/airflow 입니다) airflow.cfg 파일 수정
  • 이때 저는 pymysql 라이브러리를 사용했기때문에 sql_alchemy_conn를 다음과같이 작성했습니다.(사용하는 라이브러리에 맞게 작성해야함)
# airflow 예제 제거하기
load_example = True

# DAG 업데이트 시간 조정하기 (UI), 이 시간을 0으로 둘 경우 아주 많은 CPU를 사용하게 된다.
min_file_process_interval = 60
dag_dir_list_interval = 30

# 데이터베이스 연결하기
sql_alchemy_conn = mysql+pymysql://db이름:비밀번호@localhost/airflow

# Executor 설정하기
# SequentialExecutor(Default)를 사용하면 한 번에 하나의 작업만 처리할 수 있다.
executor = LocalExecutor
parallelism = 64
dag_concurrency = 32

mysql commend에서 다음명령어 실행

  • SET explicit_defaults_for_timestamp=1;
    • 위설정을 해주지않으면 db init 명령어로 metadata를 생성하는 ddl을 날리는도중에 오류가 발생함
    • 실행할때 import error가 뜬다면 아래명령어 실행
    • export DYLD_LIBRARY_PATH="/usr/local/mysql/lib:$PATH" 
  • airflow를 실행하기전 DB초기화
 
airflow db init

초기화가 성공한 후 미리 만들어 뒀던 airflow db를 들어가보면 meta data를 저장하는 테이블들이 생성돼 있을것입니다.

  • airflow 계정을 생성합니다.
airflow users create --username sdf --firstname admin --lastname sdf --role Admin --email fj2008@naver.com

~/airflow 의 디렉터리 내부 구조

  • airflow.cfg
    • airflow의 환경설정 파일
  • airflow.db
    • DB 관련된 정보를 담고있다.
  • logs
    • airflow의 각종 로그를 관리
  • dags
    • airflow에서 dag를 관리하는 디렉토리

airflow webserver 실행

airflow webserver --port 8080

예~~~~