DevOps

[MLOps] MinIO 파일 감지 자동화: Airflow S3 센서로 Ray Serve를 실행하는 완벽 가이드

Sophie소피 2025. 10. 31. 20:16

 

 

이전 포스팅에 이어서
Airflow + MinIO + Ray Serve
를 하나의 Pipeline으로 연결해,
카메라 이미지 업로드 → S3KeySensor 감지 → Ray 추론 호출”이

자동으로 이어지는 end-to end 흐름을 완성해보려고 합니다. 

 

ChatGPT 통해 생성

단계별로 진행하면,
MinIO 초기화 (버킷 생성) →
Airflow DAG 추가 (S3KeySensor → Ray Serve) →
Airflow와 MinIO 연결 설정 →

실제 업로드 트리거와 DAG 실행 확인 까지 하나의 자동화된 Data Flow를 직접 볼 수 있습니다.

 

이 시리즈의 목적은 “Local 환경에서 Hybrid MLOps Pipeline이

실제로 어떻게 동작하는지”를 보여주는 거예요.


실제 카메라 이미지 대신 https://picsum.photos 이미지를 업로드하여
Airflow DAG이 S3 이벤트를 감지하고, Ray Serve에 추론 요청을 보내는 과정을 단계별로 시각화합니다.

 

 


0) Preflight (MinIO & Ray 기동)

  • MinIO와 Ray만 먼저 띄워둠 (이미 떠있으면 OK)
  • 왜? 컨테이너/엔드포인트가 먼저 살아있어야 이후 단계가 전부 먹힘.
docker ps --format "table {{.Names}}\\t{{.Status}}\\t{{.Ports}}"
curl -s <http://127.0.0.1:8000/inference/healthz> && echo

  • : ray-inference가 up, /healthz가 200 OK.

 

 


 

 

 

 

 

1) MinIO 초기화 (버킷 생성)

  • camera 버킷 생성 (이미 있으면 통과), 공개 정책은 선택
  • 왜? S3KeySensor가 볼 버킷/키가 없으면 DAG이 영원히 대기해야함,,,
mkdir -p ~/.mc
docker run --rm --network=hybrid-mlops-demo_default -v ~/.mc:/root/.mc minio/mc alias set local <http://minio:9000> minioadmin minioadmin123
docker run --rm --network=hybrid-mlops-demo_default -v ~/.mc:/root/.mc minio/mc mb local/camera || true
docker run --rm --network=hybrid-mlops-demo_default -v ~/.mc:/root/.mc minio/mc ls local

 

 

 

 

 

 


2) Airflow용 S3 DAG 추가 (S3KeySensor → Ray Serve)

  • 왜? DAG 코드 외부 의존(네트워크/DNS/Connection)이 맞아야 한 번에 성공.
  • 체크리스트
    • dags/s3_camera_to_infer.py 안에서
      • S3KeySensor(bucket_key="latest.jpg", bucket_name="camera", aws_conn_id="minio_s3")
      • BashOperator가 curl -X POST <http://ray-inference:8000/inference/> ...
    • 스케줄은 @once(최초 1회 자동) + 필요 시 수동 트리거
# WSL 그대로 복붙 
mkdir -p airflow/dags && cat > airflow/dags/s3_camera_to_infer.py <<'PY'
from datetime import datetime
from airflow import DAG
from airflow.sensors.base import PokeReturnValue
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.operators.bash import BashOperator

# MinIO S3: 버킷/키
BUCKET = "camera"
KEY    = "latest.jpg"
AWS_CONN_ID = "minio_s3"   # 아래 3단계에서 생성

# Ray endpoint (동일 compose 네트워크에서 서비스명으로 접근)
RAY_ENDPOINT = "<http://ray-inference:8000/inference/>"

with DAG(
    dag_id="s3_camera_to_infer",
    start_date=datetime(2025, 10, 31),
    schedule_interval="@once",
    catchup=False,
    default_args={"owner": "sophie"},
    tags=["s3","minio","ray"]
) as dag:

    wait_s3 = S3KeySensor(
        task_id="wait_s3_latest_jpg",
        bucket_key=KEY,
        bucket_name=BUCKET,
        aws_conn_id=AWS_CONN_ID,
        poke_interval=5,
        timeout=60*10,
        soft_fail=False,
        mode="poke",
        deferrable=False,
    )

    call_infer = BashOperator(
        task_id="call_ray_infer",
        bash_command=(
            "curl -s -X POST {{ params.endpoint }} "
            "-H 'Content-Type: application/json' "
            "--data '{\\"input\\":[10,20,30,40]}'"
        ),
        params={"endpoint": RAY_ENDPOINT},
    )

    wait_s3 >> call_infer
PY

 

 

 

 

 

2-1)  Airflow용 S3 DAG 추가 (S3KeySensor → Ray Serve 하기 전에 체크 사항

  1. AWS Connection 등록 (Airflow UI or CLI)
    • aws_conn_id="minio_s3" 이 부분은 Airflow가 MinIO를 S3처럼 인식하게 해주는 설정입니다. 이게 없으면 DAG가 “connection not found: minio_s3” 에러가 나기 때문에 하기 전에 컨테이너 기동 후 모든 컨테이너들이 상태가 “Healthy”한지 확인해야합니다.
      • 터미널에서 아래 명령 한 줄로 등록해주면 됩니다.
docker exec -it airflow-webserver airflow connections add minio_s3 \\ 
  --conn-type aws \\ 
  --conn-extra '{"aws_access_key_id": "m32432n", "aws_secret_access_key": "43242", "host": "<http://minio:9000>"}'

 

 

2. DAG 파일 위치 확인

  • Airflow 컨테이너의 /opt/airflow/dags 안에 파일이 들어가야 인식하여 airflow/dags/s3_camera_to_infer.py 만들면docker-compose.yml에서 이미 Volume 매핑되어 있습니다.
  • 만약 안 돼 있으면 yq나 vim으로 한번 확인!
 docker exec -it airflow-webserver ls /opt/airflow/dags

 

 

 

 

 

3. DAG 로드 확인

  • http://localhost:8080
  • 기본 로그인 정보
    • Username: airflow
    • Password: airflow
  • Airflow UI 접속 → DAGs 탭 → s3_camera_to_infer 나타나는지 확인 보이면 “Trigger DAG” 눌러서 실행.
  • 만약 8080 Port로 안들어가지면
  • http://172.22.234.117:8080 로 들어가면 됩니다

 

 

 

 

 


 

 

 

3) Airflow에 MinIO 연결 등록 + 네트워크 붙이기

3-1. 네트워크

  • 왜? Airflow 컨테이너에서 minio/ray-inference가 이름(DNS)으로 보여야 함.
  • 생성한 docker compose는 airflow-webserver, airflow-scheduler, ray-inference, minio가→ 추가 docker network connect … 생략해도 됨
  • 모두 같은 hybrid-mlops-demo_default 네트워크에 이미 붙어 있습니다.

 

3-2. MinIO 연결 존재 여부 확인 → 없으면 생성

  • 왜? S3KeySensor는 Airflow Connection(minio_s3) 를 통해 접속.
  • (컨테이너 이름 주의: airflow-webserver 사용)
  • Airflow 컨테이너가 ray-inference를 DNS로 찾을 수 있게 네트워크 연결(1회만)
  • Airflow에 MinIO 연결 생성 (웹서버 컨테이너에서)
  • DAG 인식 위해 Airflow 두 개 살짝 재시작
docker exec -it airflow-webserver airflow connections get minio_s3 || \\
docker exec -it airflow-webserver airflow connections add minio_s3 \\
  --conn-type aws \\
  --conn-login minioadmin \\
  --conn-password minioadmin123 \\
  --conn-extra '{"endpoint_url": "<http://minio:9000>", "region_name": "us-east-1"}'

 

 

 

 

 

 

 


 

4) Sample 업로드(Trigger) → DAG 실행 확인

4-1) 샘플 이미지 준비 + S3 업로드

  • 왜?업로드 자체가 Sensor의 성공 조건(= Trigger 역할).
curl -L -o images/latest.jpg <https://picsum.photos/640/480>
docker run --rm --network=hybrid-mlops-demo_default -v "$PWD/images:/data" \\
  -e MC_HOST_local=http://minioadmin:minioadmssfdinfds123@minio:9000 \\
  minio/mc cp /data/latest.jpg local/camera/latest.jpg

 

 

4-2) DAG 수동 트리거

  • 왜? @once로 자동 1회 돌아도, 수동 트리거로 즉시 검증 가능.
docker exec -it airflow-webserver airflow dags trigger s3_camera_to_infer

 

 

4-3) Ray 응답 확인(호스트에서)

  • 왜? 서비스 직접콜로 결과 확인(device/output/latancy).
curl -s <http://127.0.0.1:8000/inference/healthz> && echo
curl -s -X POST <http://127.0.0.1:8000/inference/> \\
  -H "Content-Type: application/json" \\
  -d '{"input":[10,20,30,40]}' 

  • 예시: {"device":"cuda","output":[20.0,40.0,60.0,80.0],"latency_sec":4e-06}

 

 

4-4) Airflow 로그에서 POST 응답 JSON만 뽑기

이 단계는 뭐고 왜 해야할까?

  • 무엇을?
    • BashOperator가 Ray에 보낸 POST 호출의 실제 응답 본문(JSON) 을 Airflow 로그에서 자동 추출.
  • 왜?
    • UI 캡처 외에 증적(artifact) 으로 남기고, 블로그에 결과값(JSON/latency/GPU 여부)을 깨끗하게 삽입하려고.

Airflow 2.6+는 파일 경로가 logs/dag_id=.../run_id=.../task_id=.../attempt=1.log 형태.

SCH=$(docker ps --format '{{.Names}}' | grep -E 'scheduler' | head -n1)

docker exec -it "$SCH" bash -lc '
LOG_BASE=$(airflow config get-value logging base_log_folder)

LOG=$(find "$LOG_BASE" -type f -name "attempt=*.log" \\
  -path "*dag_id=s3_camera_to_infer*" -path "*task_id=call_ray_infer*" \\
  -printf "%T@ %p\\n" 2>/dev/null | sort -n | tail -1 | cut -d" " -f2-)

echo "[LOG] $LOG"
if [ -n "$LOG" ] && [ -f "$LOG" ]; then
  echo "── POST 응답(JSON) ──"
  tac "$LOG" | grep -oE '\\''\\{[^{}"]*"(device|output)"[^{}]*\\}'\\'' | head -n1
else
  echo "call_ray_infer Log nono~~"
fi
'

  • 예시: {"device":"cuda","output":[20.0,40.0,60.0,80.0],"latency_sec":4e-06}
  • → GPU로 처리 device=cuda , 연산 결과 확인, 지연 시간까지 기록.

 

 

 

 


마무리 & 다음 포스팅 예고

여기까지가 1단계~4단계 (MinIO → Airflow → Ray) 의 전 과정이었습니다.

  • MinIO 버킷(camera/) 안에 latest.jpg가 업로드되고,
  • Airflow DAG(s3_camera_to_infer)가 S3KeySensor → BashOperator로 정상 실행되며,
  • Ray Serve가 CUDA 디바이스에서 추론 응답 JSON을 반환하는 것까지

 

 

다음 포스팅에서는 
Airflow UI를 통해 이 Pipeline을 “시각적으로” 점검하고,
Grid / Graph / Log / Connections
어떤 화면을 봐야 문제를 빠르게 파악할 수 있는지를 단계별로 다룰 예정입니다. 

  • DAGs 탭에서 실행 상태 확인하기
  • Grid 뷰에서 wait_s3_latest_jpg → call_ray_infer 흐름 읽기
  • Log 뷰에서 Ray POST 응답 JSON 직접 찾기
  • Admin → Connections 에서 MinIO 연결 확인하기
  • Graph 뷰로 전체 데이터 플로우 시각화

다음 편: “Airflow UI에서 MLOps 파이프라인 읽는 법”
Grid, Graph, Logs, Browse 메뉴를 통해
센서 → 추론 → 결과까지 한눈에 모니터링하는 방법을 다룰 예정입니다.

 

 

감사합니다. 

 

 

GithubLink

https://github.com/daeun-ops/hybrid-mlops-demo

 

GitHub - daeun-ops/hybrid-mlops-demo

Contribute to daeun-ops/hybrid-mlops-demo development by creating an account on GitHub.

github.com