Celery, RabbitMQ 이용해 비동기 처리하기
Celery
Celery는 비동기 작업을 처리할 수 있도록 해주는 비동기 작업 큐이다. 비동기적으로 실행되야 할 때 사용된다
celery는 웹서버-브로커-워커 이세가지와 상호작용을 한다
celery에서는 작업 메시지를 브로커에게 전달한다.
rabbitmq
대기중인 작업을 대기열에 보관했다가 적절한 worker에게 작업을 전달해주는 역할을 한다
rabbitmq는 celery 와 함께 사용하는 메시지 브로커이다
broker가 필요한 이유 : 셀러리는 실제로 메시지 큐 자체를 구성하는 것은 아니기 때문에 이 작업을 수행하기위해서 추가 메시지 전송이 필요하다. 셀러리는 메시지 브로커를 감싸는 래퍼로 생각할 수 있다
worker
worker에서는 해당 작업을 가져와 수행하게 된다. 이과정에서 celery는 메시지를 전달하는 역할과 메시지를 가져와 작업을 수행하는 역할을 담당하게 된다.
celery 설치 & rabbitMQ설치
- Celery 패키지 설치
$ pip install celery
- celery django에서 실행
- django에서 쓰기 위해 celery instance를 정의한다
- app.autodiscover_tasks()를 정의하여 Celery에서 자동으로 Task들을 찾아낸다
- Celery는 shared_task라는 테코레이터를 제공하고 Task에 @shared_task 데코레이터를 붙여 비동기적으로 처리 될 수 있도록 한다
# config/celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
app = Celery('config', broker='amqp://', backend='rpc://', include=['user.tasks'])
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
- Celery 실행확인
- flower라는 패키지 설치
- flower는 celery를 위한 실시간 웹 기반 모니터입니다. 작업 진행 상황과 기록을 쉽게 모니터링 할 수 있습니다 (http://localhost:5555 에서 액세스)
$ pip install flower
$ celery -A config flower
- rabbimq 설치
$ sudo apt-get install rabbitmq-serve
- celery가 접근할 수 있도록 user를 생성한다 (기존 guest 삭제)
$ sudo rabbitmqctl add_user <user> <pwd>
$ sudo rabbitmqctl set_user_tags <user> administrator
$ sudo rabbitmqctl set_permissions -p / <user> ".*"".*" ".*"
$ sudo rabbitmqctl delete_user guest
$ sudo /etc/init.d/rabbitmq-server restart
$ sudo rabbitmq-plugins enable rabbitmq_management
$ sudo /etc/init.d/rabbitmq-server restart
- 모니터링 확인 방법
- celery와 broker를 연결하기 위해 settings.py에서 BROKER_URL을 입력하여 연결한다
# config/settings
CELERY_BROKER_URL = env('CELERY_BROKER_URL')
- .env
CELERY_BROKER_URL="amqp://admin:1234@localhost:5672//"
Celery와 rabbitMQ실행 (개발환경)
-
django, rabbitmq, celery 서버 3개 모두 실행시킨다
-
celery 실행
celery -A config worker -l info
-------------- celery@heejung-15Z90N-VR5BK v5.1.2 (sun-harmonics)
--- ***** -----
-- ******* ---- Linux-5.11.0-41-generic-x86_64-with-glibc2.29 2021-12-10 13:36:32
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: config:0x7fd43406fbb0
- ** ---------- .> transport: amqp://admin:**@localhost:5672//
- ** ---------- .> results: rpc://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. config.celery.debug_task
. user.tasks.send_email
. user.tasks.task_scrappy_daum
. user.tasks.task_scrappy_naver
[2021-12-10 13:36:32,764: INFO/MainProcess] Connected to amqp://admin:**@127.0.0.1:5672//
- rabbitmq 실행
$ rabbitmq-server
## ## RabbitMQ 3.8.2
## ##
########## Copyright (c) 2007-2019 Pivotal Software, Inc.
###### ##
########## Licensed under the MPL 1.1. Website: https://rabbitmq.com
Doc guides: https://rabbitmq.com/documentation.html
Support: https://rabbitmq.com/contact.html
Tutorials: https://rabbitmq.com/getstarted.html
Monitoring: https://rabbitmq.com/monitoring.html
Logs: /var/log/rabbitmq/rabbit@heejung-15Z90N-VR5BK.log
/var/log/rabbitmq/rabbit@heejung-15Z90N-VR5BK_upgrade.log
Config file(s): (none)
Starting broker... completed with 3 plugins.
- rabbitmq 종료
$ sudo rabbitmqctl stop
Celery를 이용한 이메일 인증 비동기 처리
- 회원가입 하는 유저의 이메일 유효성 검증하는 메소드에서 검증이 끝나면 회원가입된 유저의 객체와 유저의 이메일을 반환한다
- 이메일 인증을 위해 필요한 유저의 email 정보를 인자로 받아 이메일를 보내는 메서드를 호출해서 인증 링크와 인증 메시지 타이틀을 생성해 메시지를 보낼 유저의 이메일에 shared_task 데코레이터를 붙여 비동기적으로 전송
@shared_task
def send_email(mail_title, message_data, mail_to):
email = EmailMessage(mail_title, message_data, to=[mail_to])
email.send()
return None
이메일 인증 링크 보내는 과정
회원가입 → 인증 링크 생성 → 이메일에서 클릭 → 리다이렉트
urlsafe_base64 사용 이유 : base64로 암호화 하는 경우 웹으로 전송하는 문자 중 정상적으로 전송되지 않는 문제가 발생하기 때문에 이러한 오류를 막기 위해 safe를 사용
def user_email(request, pk, email):
domain = get_current_site(request).domain
uidb64 = urlsafe_base64_encode(force_bytes(pk))
token = jwt.encode({'user_pk' : pk}, SECRET_KEY, ALGORITHM).decode('utf-8')
message_data = message(domain, uidb64, token)
mail_title = '이메일 인증을 완료해주세요'
mail_to = email
return (mail_title, message_data, mail_to)
def message(domain, uidb64, token):
return f"인증에 성공 하셨습니다. :-) 아래 링크 클릭해서 회원가입을 해주세요 ! \n http://{domain}/user/account/activate/{uidb64}/{token}"
이메일 재인증 링크 보내는 과정
회원가입 → 다른 이메일로 재인증 전송 → 이메일에서 클릭 → 리다이렉트
def verify_resend_email(self, dto:ResendDto):
try:
user = User.objects.get(email=dto.email)
user = UserService.update(dto.email, dto.resend_email)
mail = dto.resend_email
return {"user": user, "mail": mail, "error":False}
except user.DoesNotExist:
context = context_info(msg="회원가입부터 해주세요", error=True)
return context
비밀번호 찾기 인증번호 보내는 과정
class FindPasswordEmailView(View):
'''
description: 비밀번호 찾으려는 유저의 이메일 유효성 검증, 검증 후 인증번호 이메일로 발송
'''
def post(self, request, *args, **kwargs):
if request.is_ajax():
data = self._build_vaild_email(request)
result = UserService.vaildate_user_email(data)
if result['error']:
return JsonResponse(result)
auth_num = email_auth_num()
result['user'].auth = auth_num
result['user'].save()
context = UserEmailVerifyService.send_email_with_auth_num(data.email, auth_num)
return JsonResponse(context)
def _build_vaild_email(self, request):
data = json.loads(request.body)
return VaildEmailDto(
email = data.get('email')
)
class AuthNumConfirmView(View):
'''
description: 인증번호 유효성 검증, 검증 후 비밀번호 변경 view로 이동
'''
def get(self, request, *args, **kwargs):
context = context_info(name=request.user.nickname)
return render(request,'change-password.html',context)
def post(self, request, *args, **kwargs):
if request.is_ajax():
data = self._build_user_confirm_info(request)
result = UserEmailVerifyService.verify_num(data)
if not result['success']:
return JsonResponse(result)
user = UserService.get_filter_auth_user(data)
user.auth = ''
user.save()
request.session['auth'] = user.email
context = context_info(result=user.email, success=True)
return JsonResponse(context)
def _build_user_confirm_info(self, request):
data = json.loads(request.body)
return UserConfirmDto(
email=data.get('email'),
valid_num=data.get('valid_num')
)
@method_decorator(csrf_exempt, name='dispatch')
class ValidChangePwdView(View):
'''
description: 변경된 비밀번호 유효성 검증, 검증후 유저의 비밀번호 변경
'''
def get(self, request, *args, **kwargs):
reset_pwd_form = ChangeSetPwdForm(None)
context = context_info(forms=reset_pwd_form)
return render(request, 'valid-change-pwd.html', context)
def post(self, request, *args, **kwargs):
if request.is_ajax():
data = self._build_vaild_pwd_info(request)
current_user = UserService.get_email_user(data.auth)
auth_login(request, current_user)
reset_pwd_form = ChangeSetPwdForm(current_user, json.loads(request.body))
result = UserEmailVerifyService.verify_change_pwd(reset_pwd_form, current_user)
return JsonResponse(result)
def _build_vaild_pwd_info(self, request):
return AuthDto(
auth = request.session.get('auth'),
user = request.user
)
class LoginCallBackView(TemplateView):
template_name = 'login_callback.html'
def get(self, request, *args, **kwargs):
context = {'message': 'SUCCESS'}
return self.render_to_response(context)
class UserEmailVerifyService():
@staticmethod
def send_email_with_auth_num(email, auth_num):
message_data = pwd_change_message(auth_num)
mail_title = '이메일 인증을 완료해주세요'
mail_to = email
send_email.delay(mail_title, message_data, mail_to)
context = context_info(
msg='이메일에 인증번호를 발송했습니다!',
error=False,
auth_num=auth_num
)
return context
Celery Beat를 이용해 스크래핑 자동화 처리
from celery.schedules import crontab
app.conf.beat_schedule = {
'add-every-minutes-naver':{
'task':'user.tasks.task_scrappy_naver',
'schedule': crontab(minute='*/3'),
},
'add-every-minutes-daum':{
'task':'user.tasks.task_scrappy_daum',
'schedule': crontab(minute='*/5'),
}
}
- 개발환경 실행
$ celery -A config beat -l info
Celery & RabbitMQ EC2 서버에서 실행 (배포)
docs.celeryproject.org 데몬으로 돌리기 위해 즉 터미널에서 직접 실행시키지 않아도 알아서 돌아가게 하기 위해서 몇가지의 파일을 작성해야 한다
- celery가 사용할 설정 파일
- 데몬으로 실행하기 위한 service파일
celery 설정 파일 주의할점: concurrency=8이 default값이지만 ec2 프리티어에서는 작동 안하는 무시한 상황이 벌어짐..
/etc/conf.d/celery
CELERYD_NODES="w1"
CELERY_BIN="/home/ubuntu/myvenv/bin/celery"
CELERY_APP="config"
CELERYD_MULTI="multi"
CELERYD_OPTS="--time-limit=1400 --concurrency=2"
# - %n will be replaced with the first part of the nodename.
# - %I will be replaced with the current child process index
# and is important when using the prefork pool to avoid race conditions.
CELERYD_PID_FILE="/var/run/celery/%n.pid"
CELERYD_LOG_FILE="/var/log/celery/%n%I.log"
CELERYD_LOG_LEVEL="INFO"
# you may wish to add these options for Celery Beat
CELERYBEAT_PID_FILE="/var/run/celery/beat.pid"
CELERYBEAT_LOG_FILE="/var/log/celery/beat.log"
services 파일
/etc/systemd/system/celery.service
[Unit]
Description=Celery Service
After=network.target
[Service]
Type=forking
User=ubuntu
Group=ubuntu
EnvironmentFile=/etc/conf.d/celery
WorkingDirectory=/srv/Neo-News
ExecStart=/bin/sh -c '${CELERY_BIN} multi start ${CELERYD_NODES} \\
-A ${CELERY_APP} --pidfile=${CELERYD_PID_FILE} \\
--logfile=${CELERYD_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} ${CELERYD_OPTS}'
ExecStop=/bin/sh -c '${CELERY_BIN} multi stopwait ${CELERYD_NODES} \\
--pidfile=${CELERYD_PID_FILE}'
ExecReload=/bin/sh -c '${CELERY_BIN} multi restart ${CELERYD_NODES} \\
-A ${CELERY_APP} --pidfile=${CELERYD_PID_FILE} \\
--logfile=${CELERYD_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} ${CELERYD_OPTS}'
[Install]
WantedBy=multi-user.target
/etc/systemd/system/celerybeat.service
[Unit]
Description=Celery Beat Service
After=network.target
[Service]
Type=simple
User=ubuntu
Group=ubuntu
EnvironmentFile=/etc/conf.d/celery
WorkingDirectory=/srv/Neo-News
ExecStart=/bin/sh -c '${CELERY_BIN} -A ${CELERY_APP} beat --pidfile=${CELERYBEAT_PID_FILE} --logfile=${CELERYBEAT_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL}'
Restart=always
[Install]
WantedBy=multi-user.target
/lib/systemd/system/rabbitmq-server.service
[Unit]
Description=RabbitMQ Messaging Server
After=network.target epmd@0.0.0.0.socket
Wants=network.target epmd@0.0.0.0.socket
[Service]
Type=notify
User=rabbitmq
SyslogIdentifier=rabbitmq
NotifyAccess=all
TimeoutStartSec=3600
LimitNOFILE=65536
TimeoutStartSec=600
Restart=on-failure
RestartSec=10
ExecStart=/usr/sbin/rabbitmq-server
ExecStop=/usr/sbin/rabbitmqctl stop
[Install]
WantedBy=multi-user.target
service파일 구동
- sudo systemctl restart celery.service # service파일 재시작
- sudo systemctl start celery.service # service파일 시작
- sudo systemctl status celery.service # service 상태
- sudo systemctl stop celery.service # service 중지
실제로 Celery가 돌아가면 동작되는 log를 볼 수 있다
/var/log/celery/
beat가 돌아간 함수, 시간, 날짜를 볼 수 있다
cat beat.log
worker가 돌아간 함수, 시간, 날짜를 볼 수 있다. beat가 돌아가는 함수와 worker를 공유해서 쓰기 때문에 스크래핑 되는 로그와 이메일 인증시 동작되는 로그가 함께 보인다.
cat w1-1.log