이전 포스팅에 이어서
Airflow + MinIO + Ray Serve를 하나의 Pipeline으로 연결해,
“카메라 이미지 업로드 → S3KeySensor 감지 → Ray 추론 호출”이
자동으로 이어지는 end-to end 흐름을 완성해보려고 합니다.

단계별로 진행하면,
MinIO 초기화 (버킷 생성) →
Airflow DAG 추가 (S3KeySensor → Ray Serve) →
Airflow와 MinIO 연결 설정 →
실제 업로드 트리거와 DAG 실행 확인 까지 하나의 자동화된 Data Flow를 직접 볼 수 있습니다.
이 시리즈의 목적은 “Local 환경에서 Hybrid MLOps Pipeline이
실제로 어떻게 동작하는지”를 보여주는 거예요.
실제 카메라 이미지 대신 https://picsum.photos 이미지를 업로드하여
Airflow DAG이 S3 이벤트를 감지하고, Ray Serve에 추론 요청을 보내는 과정을 단계별로 시각화합니다.
0) Preflight (MinIO & Ray 기동)
- MinIO와 Ray만 먼저 띄워둠 (이미 떠있으면 OK)
- 왜? 컨테이너/엔드포인트가 먼저 살아있어야 이후 단계가 전부 먹힘.
docker ps --format "table {{.Names}}\\t{{.Status}}\\t{{.Ports}}"
curl -s <http://127.0.0.1:8000/inference/healthz> && echo

- : ray-inference가 up, /healthz가 200 OK.
1) MinIO 초기화 (버킷 생성)
- camera 버킷 생성 (이미 있으면 통과), 공개 정책은 선택
- 왜? S3KeySensor가 볼 버킷/키가 없으면 DAG이 영원히 대기해야함,,,
mkdir -p ~/.mc
docker run --rm --network=hybrid-mlops-demo_default -v ~/.mc:/root/.mc minio/mc alias set local <http://minio:9000> minioadmin minioadmin123
docker run --rm --network=hybrid-mlops-demo_default -v ~/.mc:/root/.mc minio/mc mb local/camera || true
docker run --rm --network=hybrid-mlops-demo_default -v ~/.mc:/root/.mc minio/mc ls local

2) Airflow용 S3 DAG 추가 (S3KeySensor → Ray Serve)
- 왜? DAG 코드 외부 의존(네트워크/DNS/Connection)이 맞아야 한 번에 성공.
- 체크리스트
- dags/s3_camera_to_infer.py 안에서
- S3KeySensor(bucket_key="latest.jpg", bucket_name="camera", aws_conn_id="minio_s3")
- BashOperator가 curl -X POST <http://ray-inference:8000/inference/> ...
- 스케줄은 @once(최초 1회 자동) + 필요 시 수동 트리거
- dags/s3_camera_to_infer.py 안에서
# WSL 그대로 복붙
mkdir -p airflow/dags && cat > airflow/dags/s3_camera_to_infer.py <<'PY'
from datetime import datetime
from airflow import DAG
from airflow.sensors.base import PokeReturnValue
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.operators.bash import BashOperator
# MinIO S3: 버킷/키
BUCKET = "camera"
KEY = "latest.jpg"
AWS_CONN_ID = "minio_s3" # 아래 3단계에서 생성
# Ray endpoint (동일 compose 네트워크에서 서비스명으로 접근)
RAY_ENDPOINT = "<http://ray-inference:8000/inference/>"
with DAG(
dag_id="s3_camera_to_infer",
start_date=datetime(2025, 10, 31),
schedule_interval="@once",
catchup=False,
default_args={"owner": "sophie"},
tags=["s3","minio","ray"]
) as dag:
wait_s3 = S3KeySensor(
task_id="wait_s3_latest_jpg",
bucket_key=KEY,
bucket_name=BUCKET,
aws_conn_id=AWS_CONN_ID,
poke_interval=5,
timeout=60*10,
soft_fail=False,
mode="poke",
deferrable=False,
)
call_infer = BashOperator(
task_id="call_ray_infer",
bash_command=(
"curl -s -X POST {{ params.endpoint }} "
"-H 'Content-Type: application/json' "
"--data '{\\"input\\":[10,20,30,40]}'"
),
params={"endpoint": RAY_ENDPOINT},
)
wait_s3 >> call_infer
PY
2-1) Airflow용 S3 DAG 추가 (S3KeySensor → Ray Serve 하기 전에 체크 사항
- AWS Connection 등록 (Airflow UI or CLI)
- aws_conn_id="minio_s3" 이 부분은 Airflow가 MinIO를 S3처럼 인식하게 해주는 설정입니다. 이게 없으면 DAG가 “connection not found: minio_s3” 에러가 나기 때문에 하기 전에 컨테이너 기동 후 모든 컨테이너들이 상태가 “Healthy”한지 확인해야합니다.
- 터미널에서 아래 명령 한 줄로 등록해주면 됩니다.
- aws_conn_id="minio_s3" 이 부분은 Airflow가 MinIO를 S3처럼 인식하게 해주는 설정입니다. 이게 없으면 DAG가 “connection not found: minio_s3” 에러가 나기 때문에 하기 전에 컨테이너 기동 후 모든 컨테이너들이 상태가 “Healthy”한지 확인해야합니다.
docker exec -it airflow-webserver airflow connections add minio_s3 \\
--conn-type aws \\
--conn-extra '{"aws_access_key_id": "m32432n", "aws_secret_access_key": "43242", "host": "<http://minio:9000>"}'


2. DAG 파일 위치 확인
- Airflow 컨테이너의 /opt/airflow/dags 안에 파일이 들어가야 인식하여 airflow/dags/s3_camera_to_infer.py 만들면docker-compose.yml에서 이미 Volume 매핑되어 있습니다.
- 만약 안 돼 있으면 yq나 vim으로 한번 확인!
docker exec -it airflow-webserver ls /opt/airflow/dags

3. DAG 로드 확인
- http://localhost:8080
- 기본 로그인 정보
- Username: airflow
- Password: airflow
- Airflow UI 접속 → DAGs 탭 → s3_camera_to_infer 나타나는지 확인 보이면 “Trigger DAG” 눌러서 실행.
- 만약 8080 Port로 안들어가지면
- http://172.22.234.117:8080 로 들어가면 됩니다


3) Airflow에 MinIO 연결 등록 + 네트워크 붙이기
3-1. 네트워크
- 왜? Airflow 컨테이너에서 minio/ray-inference가 이름(DNS)으로 보여야 함.
- 생성한 docker compose는 airflow-webserver, airflow-scheduler, ray-inference, minio가→ 추가 docker network connect … 생략해도 됨 ✅
- 모두 같은 hybrid-mlops-demo_default 네트워크에 이미 붙어 있습니다.
3-2. MinIO 연결 존재 여부 확인 → 없으면 생성
- 왜? S3KeySensor는 Airflow Connection(minio_s3) 를 통해 접속.
- (컨테이너 이름 주의: airflow-webserver 사용)
- Airflow 컨테이너가 ray-inference를 DNS로 찾을 수 있게 네트워크 연결(1회만)
- Airflow에 MinIO 연결 생성 (웹서버 컨테이너에서)
- DAG 인식 위해 Airflow 두 개 살짝 재시작
docker exec -it airflow-webserver airflow connections get minio_s3 || \\
docker exec -it airflow-webserver airflow connections add minio_s3 \\
--conn-type aws \\
--conn-login minioadmin \\
--conn-password minioadmin123 \\
--conn-extra '{"endpoint_url": "<http://minio:9000>", "region_name": "us-east-1"}'

4) Sample 업로드(Trigger) → DAG 실행 확인
4-1) 샘플 이미지 준비 + S3 업로드
- 왜? 이 업로드 자체가 Sensor의 성공 조건(= Trigger 역할).
curl -L -o images/latest.jpg <https://picsum.photos/640/480>
docker run --rm --network=hybrid-mlops-demo_default -v "$PWD/images:/data" \\
-e MC_HOST_local=http://minioadmin:minioadmssfdinfds123@minio:9000 \\
minio/mc cp /data/latest.jpg local/camera/latest.jpg

4-2) DAG 수동 트리거
- 왜? @once로 자동 1회 돌아도, 수동 트리거로 즉시 검증 가능.
docker exec -it airflow-webserver airflow dags trigger s3_camera_to_infer

4-3) Ray 응답 확인(호스트에서)
- 왜? 서비스 직접콜로 결과 확인(device/output/latancy).
curl -s <http://127.0.0.1:8000/inference/healthz> && echo
curl -s -X POST <http://127.0.0.1:8000/inference/> \\
-H "Content-Type: application/json" \\
-d '{"input":[10,20,30,40]}'

- 예시: {"device":"cuda","output":[20.0,40.0,60.0,80.0],"latency_sec":4e-06}
4-4) Airflow 로그에서 POST 응답 JSON만 뽑기
이 단계는 뭐고 왜 해야할까?
- 무엇을?
- BashOperator가 Ray에 보낸 POST 호출의 실제 응답 본문(JSON) 을 Airflow 로그에서 자동 추출.
- 왜?
- UI 캡처 외에 증적(artifact) 으로 남기고, 블로그에 결과값(JSON/latency/GPU 여부)을 깨끗하게 삽입하려고.
Airflow 2.6+는 파일 경로가 logs/dag_id=.../run_id=.../task_id=.../attempt=1.log 형태.
SCH=$(docker ps --format '{{.Names}}' | grep -E 'scheduler' | head -n1)
docker exec -it "$SCH" bash -lc '
LOG_BASE=$(airflow config get-value logging base_log_folder)
LOG=$(find "$LOG_BASE" -type f -name "attempt=*.log" \\
-path "*dag_id=s3_camera_to_infer*" -path "*task_id=call_ray_infer*" \\
-printf "%T@ %p\\n" 2>/dev/null | sort -n | tail -1 | cut -d" " -f2-)
echo "[LOG] $LOG"
if [ -n "$LOG" ] && [ -f "$LOG" ]; then
echo "── POST 응답(JSON) ──"
tac "$LOG" | grep -oE '\\''\\{[^{}"]*"(device|output)"[^{}]*\\}'\\'' | head -n1
else
echo "call_ray_infer Log nono~~"
fi
'

- 예시: {"device":"cuda","output":[20.0,40.0,60.0,80.0],"latency_sec":4e-06}
- → GPU로 처리 device=cuda , 연산 결과 확인, 지연 시간까지 기록.
마무리 & 다음 포스팅 예고
여기까지가 1단계~4단계 (MinIO → Airflow → Ray) 의 전 과정이었습니다.
- MinIO 버킷(camera/) 안에 latest.jpg가 업로드되고,
- Airflow DAG(s3_camera_to_infer)가 S3KeySensor → BashOperator로 정상 실행되며,
- Ray Serve가 CUDA 디바이스에서 추론 응답 JSON을 반환하는 것까지
다음 포스팅에서는
Airflow UI를 통해 이 Pipeline을 “시각적으로” 점검하고,
Grid / Graph / Log / Connections 등
어떤 화면을 봐야 문제를 빠르게 파악할 수 있는지를 단계별로 다룰 예정입니다.
- DAGs 탭에서 실행 상태 확인하기
- Grid 뷰에서 wait_s3_latest_jpg → call_ray_infer 흐름 읽기
- Log 뷰에서 Ray POST 응답 JSON 직접 찾기
- Admin → Connections 에서 MinIO 연결 확인하기
- Graph 뷰로 전체 데이터 플로우 시각화
다음 편: “Airflow UI에서 MLOps 파이프라인 읽는 법”
Grid, Graph, Logs, Browse 메뉴를 통해
센서 → 추론 → 결과까지 한눈에 모니터링하는 방법을 다룰 예정입니다.
감사합니다.
GithubLink
https://github.com/daeun-ops/hybrid-mlops-demo
GitHub - daeun-ops/hybrid-mlops-demo
Contribute to daeun-ops/hybrid-mlops-demo development by creating an account on GitHub.
github.com
'DevOps' 카테고리의 다른 글
| [MLOps] Hybrid Demo : GPU Inference → Metrics → Grafana Dashboard 실시간 연결하기 (0) | 2025.10.31 |
|---|---|
| [MLOps] hybird-mlops-demo 실행 순서 feat. 자꾸 까먹어서 내가보려고 작성한 글 (0) | 2025.10.28 |
| [MLOps]Ray Serve GPU 자동 감지 + Dockerize + Compose (0) | 2025.10.27 |
| [MlOps]Observability 맛보기: FastAPI·Ray Log를 Local에서 살펴보기 (0) | 2025.10.26 |
| [MlOps] Airflow 학습 DAG부터 Ray Serve 추론, Minikube까지 (0) | 2025.10.26 |