/ CELERY

Celery, RabbitMQ 이용해 비동기 처리하기


Celery

Celery는 비동기 작업을 처리할 수 있도록 해주는 비동기 작업 큐이다. 비동기적으로 실행되야 할 때 사용된다

celery는 웹서버-브로커-워커 이세가지와 상호작용을 한다

celery에서는 작업 메시지를 브로커에게 전달한다.


rabbitmq

대기중인 작업을 대기열에 보관했다가 적절한 worker에게 작업을 전달해주는 역할을 한다

rabbitmq는 celery 와 함께 사용하는 메시지 브로커이다

broker가 필요한 이유 : 셀러리는 실제로 메시지 큐 자체를 구성하는 것은 아니기 때문에 이 작업을 수행하기위해서 추가 메시지 전송이 필요하다. 셀러리는 메시지 브로커를 감싸는 래퍼로 생각할 수 있다


worker

worker에서는 해당 작업을 가져와 수행하게 된다. 이과정에서 celery는 메시지를 전달하는 역할과 메시지를 가져와 작업을 수행하는 역할을 담당하게 된다.


celery 설치 & rabbitMQ설치

  • Celery 패키지 설치
$ pip install celery


  • celery django에서 실행
    • django에서 쓰기 위해 celery instance를 정의한다
    • app.autodiscover_tasks()를 정의하여 Celery에서 자동으로 Task들을 찾아낸다
    • Celery는 shared_task라는 테코레이터를 제공하고 Task에 @shared_task 데코레이터를 붙여 비동기적으로 처리 될 수 있도록 한다
# config/celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
import os

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
app = Celery('config', broker='amqp://', backend='rpc://', include=['user.tasks'])
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))


  • Celery 실행확인
    • flower라는 패키지 설치
    • flower는 celery를 위한 실시간 웹 기반 모니터입니다. 작업 진행 상황과 기록을 쉽게 모니터링 할 수 있습니다 (http://localhost:5555 에서 액세스)
$ pip install flower

$ celery -A config flower


  • rabbimq 설치
$ sudo apt-get install rabbitmq-serve


  • celery가 접근할 수 있도록 user를 생성한다 (기존 guest 삭제)
$ sudo rabbitmqctl add_user <user> <pwd>
$ sudo rabbitmqctl set_user_tags <user> administrator
$ sudo rabbitmqctl set_permissions -p / <user> ".*"".*" ".*"
$ sudo rabbitmqctl delete_user guest
$ sudo /etc/init.d/rabbitmq-server restart
$ sudo rabbitmq-plugins enable rabbitmq_management
$ sudo /etc/init.d/rabbitmq-server restart

  • celery와 broker를 연결하기 위해 settings.py에서 BROKER_URL을 입력하여 연결한다
# config/settings

CELERY_BROKER_URL = env('CELERY_BROKER_URL')


  • .env
CELERY_BROKER_URL="amqp://admin:1234@localhost:5672//"

Celery와 rabbitMQ실행 (개발환경)

  • django, rabbitmq, celery 서버 3개 모두 실행시킨다

  • celery 실행

celery -A config worker -l info
-------------- celery@heejung-15Z90N-VR5BK v5.1.2 (sun-harmonics)
--- ***** ----- 
-- ******* ---- Linux-5.11.0-41-generic-x86_64-with-glibc2.29 2021-12-10 13:36:32
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         config:0x7fd43406fbb0
- ** ---------- .> transport:   amqp://admin:**@localhost:5672//
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . config.celery.debug_task
  . user.tasks.send_email
  . user.tasks.task_scrappy_daum
  . user.tasks.task_scrappy_naver

[2021-12-10 13:36:32,764: INFO/MainProcess] Connected to amqp://admin:**@127.0.0.1:5672//


  • rabbitmq 실행
$ rabbitmq-server
##  ##      RabbitMQ 3.8.2
  ##  ##
  ##########  Copyright (c) 2007-2019 Pivotal Software, Inc.
  ######  ##
  ##########  Licensed under the MPL 1.1. Website: https://rabbitmq.com

  Doc guides: https://rabbitmq.com/documentation.html
  Support:    https://rabbitmq.com/contact.html
  Tutorials:  https://rabbitmq.com/getstarted.html
  Monitoring: https://rabbitmq.com/monitoring.html

  Logs: /var/log/rabbitmq/rabbit@heejung-15Z90N-VR5BK.log
        /var/log/rabbitmq/rabbit@heejung-15Z90N-VR5BK_upgrade.log

  Config file(s): (none)

  Starting broker... completed with 3 plugins.


  • rabbitmq 종료
$ sudo rabbitmqctl stop

Celery를 이용한 이메일 인증 비동기 처리

  1. 회원가입 하는 유저의 이메일 유효성 검증하는 메소드에서 검증이 끝나면 회원가입된 유저의 객체와 유저의 이메일을 반환한다
  2. 이메일 인증을 위해 필요한 유저의 email 정보를 인자로 받아 이메일를 보내는 메서드를 호출해서 인증 링크와 인증 메시지 타이틀을 생성해 메시지를 보낼 유저의 이메일에 shared_task 데코레이터를 붙여 비동기적으로 전송
@shared_task
def send_email(mail_title, message_data, mail_to):
  email = EmailMessage(mail_title, message_data, to=[mail_to])
  email.send()
  return None

이메일 인증 링크 보내는 과정

회원가입 → 인증 링크 생성 → 이메일에서 클릭 → 리다이렉트

urlsafe_base64 사용 이유 : base64로 암호화 하는 경우 웹으로 전송하는 문자 중 정상적으로 전송되지 않는 문제가 발생하기 때문에 이러한 오류를 막기 위해 safe를 사용

def user_email(request, pk, email):  
        domain = get_current_site(request).domain
        uidb64 = urlsafe_base64_encode(force_bytes(pk))
        token = jwt.encode({'user_pk' : pk}, SECRET_KEY, ALGORITHM).decode('utf-8')

        message_data = message(domain, uidb64, token)
        mail_title = '이메일 인증을 완료해주세요'
        mail_to = email
        
        return (mail_title, message_data, mail_to)
def message(domain, uidb64, token):
  return f"인증에 성공 하셨습니다. :-) 아래 링크 클릭해서 회원가입을 해주세요 ! \n http://{domain}/user/account/activate/{uidb64}/{token}"

이메일 재인증 링크 보내는 과정

회원가입 → 다른 이메일로 재인증 전송 → 이메일에서 클릭 → 리다이렉트

def verify_resend_email(self, dto:ResendDto):
        try:
            user = User.objects.get(email=dto.email)
            user = UserService.update(dto.email, dto.resend_email)
            mail = dto.resend_email

            return {"user": user, "mail": mail, "error":False}

        except user.DoesNotExist:
            context = context_info(msg="회원가입부터 해주세요", error=True)

            return context

비밀번호 찾기 인증번호 보내는 과정

class FindPasswordEmailView(View):
    '''
    description: 비밀번호 찾으려는 유저의 이메일 유효성 검증, 검증 후 인증번호 이메일로 발송
    '''
    def post(self, request, *args, **kwargs):
        if request.is_ajax():
            data = self._build_vaild_email(request)
            result = UserService.vaildate_user_email(data)

            if result['error']:
                return JsonResponse(result)

            auth_num = email_auth_num()

            result['user'].auth = auth_num
            result['user'].save()

            context = UserEmailVerifyService.send_email_with_auth_num(data.email, auth_num)

            return JsonResponse(context)

    def _build_vaild_email(self, request):
        data = json.loads(request.body)

        return VaildEmailDto(
            email = data.get('email')
        )

class AuthNumConfirmView(View):
    '''
    description: 인증번호 유효성 검증, 검증 후 비밀번호 변경 view로 이동
    '''
    def get(self, request, *args, **kwargs):
        context = context_info(name=request.user.nickname)

        return render(request,'change-password.html',context)

    def post(self, request, *args, **kwargs):
        if request.is_ajax():
            data = self._build_user_confirm_info(request)
            result = UserEmailVerifyService.verify_num(data)

            if not result['success']:
                return JsonResponse(result)

            user = UserService.get_filter_auth_user(data)

            user.auth = ''
            user.save()

            request.session['auth'] = user.email
            context = context_info(result=user.email, success=True) 

            return JsonResponse(context)

    def _build_user_confirm_info(self, request):
        data = json.loads(request.body)

        return UserConfirmDto(
            email=data.get('email'),
            valid_num=data.get('valid_num')
        )

@method_decorator(csrf_exempt, name='dispatch') 
class ValidChangePwdView(View):
    '''
    description: 변경된 비밀번호 유효성 검증, 검증후 유저의 비밀번호 변경
    '''
    def get(self, request, *args, **kwargs):
        reset_pwd_form = ChangeSetPwdForm(None)
        context = context_info(forms=reset_pwd_form)

        return render(request, 'valid-change-pwd.html', context)

    def post(self, request, *args, **kwargs):
        if request.is_ajax():
            data = self._build_vaild_pwd_info(request)
            current_user = UserService.get_email_user(data.auth)
            
            auth_login(request, current_user)

            reset_pwd_form = ChangeSetPwdForm(current_user, json.loads(request.body))
            result = UserEmailVerifyService.verify_change_pwd(reset_pwd_form, current_user)
            
            return JsonResponse(result)

    def _build_vaild_pwd_info(self, request):
        return AuthDto(
            auth = request.session.get('auth'),
            user = request.user
        )

class LoginCallBackView(TemplateView):
    template_name = 'login_callback.html'

    def get(self, request, *args, **kwargs):
        context = {'message': 'SUCCESS'}
        return self.render_to_response(context)

class UserEmailVerifyService():
    @staticmethod
    def send_email_with_auth_num(email, auth_num):
        message_data = pwd_change_message(auth_num)
        mail_title = '이메일 인증을 완료해주세요'
        mail_to = email
        send_email.delay(mail_title, message_data, mail_to)
        
        context = context_info(
        msg='이메일에 인증번호를 발송했습니다!', 
        error=False, 
        auth_num=auth_num
        )

        return context

Celery Beat를 이용해 스크래핑 자동화 처리

from celery.schedules import crontab

app.conf.beat_schedule = {
    'add-every-minutes-naver':{
    'task':'user.tasks.task_scrappy_naver',
    'schedule': crontab(minute='*/3'),
    },
    'add-every-minutes-daum':{
    'task':'user.tasks.task_scrappy_daum',
    'schedule': crontab(minute='*/5'),
    }
}


  • 개발환경 실행
$ celery -A config beat -l info

Celery & RabbitMQ EC2 서버에서 실행 (배포)

docs.celeryproject.org 데몬으로 돌리기 위해 즉 터미널에서 직접 실행시키지 않아도 알아서 돌아가게 하기 위해서 몇가지의 파일을 작성해야 한다

  • celery가 사용할 설정 파일
  • 데몬으로 실행하기 위한 service파일

celery 설정 파일 주의할점: concurrency=8이 default값이지만 ec2 프리티어에서는 작동 안하는 무시한 상황이 벌어짐..

/etc/conf.d/celery

CELERYD_NODES="w1"
CELERY_BIN="/home/ubuntu/myvenv/bin/celery"
CELERY_APP="config"
CELERYD_MULTI="multi"
CELERYD_OPTS="--time-limit=1400 --concurrency=2"

# - %n will be replaced with the first part of the nodename.
# - %I will be replaced with the current child process index
#   and is important when using the prefork pool to avoid race conditions.
CELERYD_PID_FILE="/var/run/celery/%n.pid"
CELERYD_LOG_FILE="/var/log/celery/%n%I.log"
CELERYD_LOG_LEVEL="INFO"

# you may wish to add these options for Celery Beat
CELERYBEAT_PID_FILE="/var/run/celery/beat.pid"
CELERYBEAT_LOG_FILE="/var/log/celery/beat.log"


services 파일

/etc/systemd/system/celery.service

[Unit]
Description=Celery Service
After=network.target

[Service]
Type=forking
User=ubuntu
Group=ubuntu
EnvironmentFile=/etc/conf.d/celery
WorkingDirectory=/srv/Neo-News
ExecStart=/bin/sh -c '${CELERY_BIN} multi start ${CELERYD_NODES} \\
-A ${CELERY_APP} --pidfile=${CELERYD_PID_FILE} \\
--logfile=${CELERYD_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} ${CELERYD_OPTS}'
ExecStop=/bin/sh -c '${CELERY_BIN} multi stopwait ${CELERYD_NODES} \\
--pidfile=${CELERYD_PID_FILE}'
ExecReload=/bin/sh -c '${CELERY_BIN} multi restart ${CELERYD_NODES} \\
-A ${CELERY_APP} --pidfile=${CELERYD_PID_FILE} \\
--logfile=${CELERYD_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} ${CELERYD_OPTS}'
[Install]
WantedBy=multi-user.target


/etc/systemd/system/celerybeat.service

[Unit]
Description=Celery Beat Service
After=network.target

[Service]
Type=simple
User=ubuntu
Group=ubuntu
EnvironmentFile=/etc/conf.d/celery
WorkingDirectory=/srv/Neo-News
ExecStart=/bin/sh -c '${CELERY_BIN} -A ${CELERY_APP} beat --pidfile=${CELERYBEAT_PID_FILE} --logfile=${CELERYBEAT_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL}'
Restart=always
[Install]
WantedBy=multi-user.target


/lib/systemd/system/rabbitmq-server.service

[Unit]
Description=RabbitMQ Messaging Server
After=network.target epmd@0.0.0.0.socket
Wants=network.target epmd@0.0.0.0.socket

[Service]
Type=notify
User=rabbitmq
SyslogIdentifier=rabbitmq
NotifyAccess=all
TimeoutStartSec=3600
LimitNOFILE=65536
TimeoutStartSec=600
Restart=on-failure
RestartSec=10
ExecStart=/usr/sbin/rabbitmq-server
ExecStop=/usr/sbin/rabbitmqctl stop

[Install]
WantedBy=multi-user.target


service파일 구동

  • sudo systemctl restart celery.service # service파일 재시작
  • sudo systemctl start celery.service # service파일 시작
  • sudo systemctl status celery.service # service 상태
  • sudo systemctl stop celery.service # service 중지


실제로 Celery가 돌아가면 동작되는 log를 볼 수 있다

/var/log/celery/


beat가 돌아간 함수, 시간, 날짜를 볼 수 있다

cat beat.log


worker가 돌아간 함수, 시간, 날짜를 볼 수 있다. beat가 돌아가는 함수와 worker를 공유해서 쓰기 때문에 스크래핑 되는 로그와 이메일 인증시 동작되는 로그가 함께 보인다.

cat w1-1.log