(celery) celery, redis, beat 알아보기 - 1
Python Celery with Redis: 비동기 작업 큐(queue)
웹 서버가 처리하기에 무거운 연산을 ex) email.send() 그냥 서버에 넣으면 해당 로직의 처리가 끝날때까지 유저는 아무런 작업을 할 수 없다. 웹 서버의 기본적인 동작 방식은 동기적(synchronous)이기 때문이다.
이를 해결하기 위해 비동기적(asynchronous)인 작업으로 처리해준다. 비동기적 처리는 길어지는 작업이 끝나기를 기다리지 않고 유저가 다른일을 할 수 있게 해준다. celery는 메시지 브로커(message broker)와 python 작업 프로세스를 연결해서 비동기 작업을 수행할 수 있는 시스템을 제공해준다.
이때 작업을 메시지 큐에 순차적으로 등록을 시켜야 작업 충돌을 방지할 수 있다. 이때 큐에 작업을 전달해주는 시스템을 브로커라고 부른다. 대표적으로 redis, rabbitMQ등이 존재한다
celery
비동기처리가 필요한 작업을 Celery가 브로커(redis)에게 전달하면 하나 이상의 worker가 이를 처리해준다
worker
비동기 작업을 할 수 있도록 도와주는 파이썬 프레임워크로 worker라고 불린다
브로커(Redis)
celery 작업을 하기 위해서는 브로커를 설정해야 한다. 브로커는 worker인 Celery를 사용하기 위해서 작업 요청을 받는 큐라고 할 수 있다. 이때 이곳에서 요청을 받아 Worker에게 적절히 분배하는 작업을 수행한다
장고에 celery와 redis 설치하기
1. celery와 redis 설치
설치시 주의할 점은 celery는 파이썬 언어로 작성되어 있기 때문에 가상환경 위에서 pip를 이용해 설치한다 redis는 인 메모리를 이용하기 때문에 wget으로 설치한다
# celery 설치
# pip를 이용해 celery 모듈과 redis와의 연동을 위한 dependency를 한번에 설치해준다
$ pip install 'celery[redis]'
# redis 설치
# 설치후 해당 .gz 파일등 redis-stable등이 프로젝트에 폴더로 생성됨, .gZ는 지워주면 될 것 같음.
# Redis관련 파일은 .env에 넣으면 될 것 같음
$ wget https://download.redis.io/redis-stable.tar.gz
$ tar xvzf redis-stable.tar.gz
$ cd redis-stable
$ make # 이 부분 시간 꽤 걸림
$ sudo make install
$ redis-server # redis 실행
$ sudo sysctl vm.overcommit_memory=1
$ redis-cli ping # 정상 설치되었는지 확인
# PONG 메시지를 띄우면 설치 성공, redis-server실행해두고 redis-cli 명령어 치기
2. redis redis-server creating server TCP listening socket *.6379:bind:Address already in use 에러 발생
redis 사용되고 있는 포트 한번 죽여줘야 한다. 2가지의 방법으로 시도를 했다. (kill 명령어가 가장 흔한데 초반에는 해당 명령어가 잘 듣지 않았다) 둘 중 하나로 종료시킨 다음 다시 redis-server 로 박스 상자 터미널에 뜨는지 확인한다
# 실행중인 포트 찾기
$ ps aux | grep redis
"""
# ex) 1013 kill 해준다
redis 1013 0.2 0.0 49788 4596 ? Ssl 09:06 0:44 /usr/bin/redis-server 127.0.0.1:6379
"""
# 1. kill
$ kill -9 1013
# 2. stop
$ sudo service redis-server stop
3. 장고와 연동하기
이메일도 같이 사용할 예정이므로 email도 env로 들어가 있음.
#.env에 추가되어야 할 내용
EMAIL_HOST='smtp.gmail.com'
EMAIL_PORT= 587
EMAIL_HOST_USER='djangosmtp10@gmail.com'
EMAIL_HOST_PASSWORD='@gmlwjddl0616'
EMAIL_USE_TLS=True
# config/settings
# email 인증
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST=env('EMAIL_HOST')
EMAIL_PORT=env('EMAIL_PORT')
EMAIL_HOST_USER=env('EMAIL_HOST_USER')
EMAIL_HOST_PASSWORD=env('EMAIL_HOST_PASSWORD')
EMAIL_USE_TLS=env('EMAIL_USE_TLS')
# celery 환경설정
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
4. worker 터미널에서 실행하기
가상환경에 들어간 후 실행해야 한다. redis는 실행시켜놓고 진행해야 한다.
$ celery -A config worker -l info
"
# 아래와 같이 나오면 성공 한 것
-------------- celery@halcyon.local v4.0 (latentcall)
---- **** -----
--- * *** * -- [Configuration]
-- * - **** --- . broker: amqp://guest@localhost:5672//
- ** ---------- . app: __main__:0x1012d8590
- ** ---------- . concurrency: 8 (processes)
- ** ---------- . events: OFF (enable -E to monitor this worker)
- ** ----------
- *** --- * --- [Queues]
-- ******* ---- . celery: exchange:celery(direct) binding:celery
--- ***** -----
[2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.
"
5. worker 실행시킨 후 발생할 수 있는 이슈
task의 메서드 내용등 업데이트가 바로되지 않아 아무런 동작을 하지 않는다면 redis서버까지 종료후 다시 실행시켜본다. conf파일에 들어가 포트 번호를 다르게 설정해서 다른 프로젝트와 혼동되지 않게 해주자..
"
[2021-07-14 00:55:02,966: INFO/MainProcess] mingle: searching for neighbors
[2021-07-14 00:55:03,979: INFO/MainProcess] mingle: all alone
[2021-07-14 00:55:04,004: WARNING/MainProcess] /home/heejung/Desktop/github-upload/Neo-News/venv/lib/python3.8/site-packages/celery/fixups/django.py:203: UserWarning: Using settings.DEBUG leads to a memory
leak, never use this setting in production environments!
warnings.warn('''Using settings.DEBUG leads to a memory
[2021-07-14 00:55:04,005: INFO/MainProcess] celery@heejung-15Z90N-VR5BK ready.
[2021-07-14 00:56:43,451: INFO/MainProcess] Task user.tasks.send_email[6aecf384-7ec4-4c6b-bb5c-84c697163ac1] received
[2021-07-14 00:56:47,521: INFO/ForkPoolWorker-8] Task user.tasks.send_email[6aecf384-7ec4-4c6b-bb5c-84c697163ac1] succeeded in 4.069017551000343s: None
[2021-07-14 01:00:23,196: INFO/MainProcess] Task user.tasks.send_email[b3b4c752-337f-443a-b6a6-6f097b7d4e11] received
[2021-07-14 01:00:26,757: INFO/ForkPoolWorker-8] Task user.tasks.send_email[b3b4c752-337f-443a-b6a6-6f097b7d4e11] succeeded in 3.560091403000115s: None
[2021-07-14 05:33:35,836: INFO/MainProcess] Task user.tasks.send_email[9a0c4c5d-7d59-4fac-9a94-8aae98b8586e] received
[2021-07-14 05:33:39,293: INFO/ForkPoolWorker-8] Task user.tasks.send_email[9a0c4c5d-7d59-4fac-9a94-8aae98b8586e] succeeded in 3.456564526000875s: None
"
여기까지가 celery와 redis의 기본 세팅 과정이다
redis-server 외부접속 허용하기
redis-server도 로컬 접속만 허용되어 있기 때문에 외부 서버에서 접속할 수 있게 해주어야 한다. 해당 파일을 열어 bind를 찾아 127.0.0.1을 주석처리하거나 지운 후 0.0.0.0을 추가한다. 이는 로컬에서만 가능한 접속을 외부에도 열어주는 설정이다.
# redis.config
$ sudo vim /etc/redis/redis.conf
celery와 beat
celery-beat(스케줄러)
주기적으로 어떠한 일을 반복적으로 해야하는 경우가 생길때 celery-beat를 사용하면 된다. celery의 작업큐에 주기적으로 작업을 넣어주는 역할을 하는것이 beat이다.
celery-beat사용하기
pip install django-celery-beat
settings.py에 스케쥴 추가
settings.py에 추가해도 되지만 celery.py에서 app.conf.update()에 추가해주어도 된다.(이곳에서는 settings.py에 등록한다) CELERYBEAT_SCHEDULE은 딕셔너리 형태로 여러개의 스케쥴을 등록하여 사용할 수 있다
timedelta를 이용
# settings.py
INSTALLED_APPS = [
'django_celery_beat'
]
CELERY_TIMEZONE = 'Asia/Seoul'
from datetime import timedelta
# 1초에 한번씩 task써져있는 작업이 실행된다. 이곳에서는 timedelta를 사용해서
# 실행될 간격을 정의하였다.
CELERYBEAT_SCHEDULE = {
'every-seconds': {
'task': '앱이름.tasks.task이름',
'schedule': timedelta(seconds=1),
'args': ()
},
}
crontab 을 이용
크론탭은 특정 시간뿐만 아니라 특정한 요일같이 다양하게 시간을 정하여 작업을 배치시킬 수 있다. timedelta와 비슷하다. schedule에 crontab을 작성하여 사용하면 된다
# settings.py
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'every-minutes': {
'task': '앱이름.tasks.task이름',
'schedule': crontab(),
'args': ()
},
}
process 구동
celery와 beat를 구동시켜 등록한 시간마다 작업이 수행되는지 확인한다
# celery 구동
$ celery -A config worker -l info
# beat 구동
$ celery beat -A config
# 동시에 구동
$ celery -A config worker -l info -B
crontab schedules
크론탭으로 설정할 수 있는 시간이 다양하다. 아래와 같은 명령어를 schedule에 작성해주면 된다. 더 다양하게 있으니 이는 따로 찾아보자
crontab() -> 1분마다 실행
crontab(minute='*/10') -> 매일 10분마다 실행
crontab(minute=0, hour='*/3') -> 3시간에 한번씩 실행
그럼 이제 celery의 로직들에 대해 좀 더 자세하게 알아보자.
celery app
celery를 설치한 다음 가장 먼저 celery 인스턴스를 얻어야 한다. 이것을 app이라고 부르는데 이 인스턴스가 모든 작업, 예를 들면 task를 만들거나 worker를 관리하는 등 모든 작업의 시작점으로 쓰이게 된다. 따라서 다른 모듈에서 사용할 수 있도록 import 되어야 한다
from __future__ import absolute_import, unicode_literals
import os, django
from datetime import timedelta
from celery import Celery
from celery.schedules import crontab
# 장고의 세팅 모듈을 Celery의 기본으로 사용하도록 등록한다
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
# celery의 첫번재 인자로 전달된 문자열은 현재 모듈의 이름으로 tasks라는 celery인스턴스를 생성한것이다
# 두번째 인자는 사용하려는 브로커의 url값이다
# 세번째 인자는 사용할 백엔드의 url을 지정한다. 백엔드는 Task의 상태와 결과를 추적하는데 이용된다.
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
# namespace='CELERY'는 모든 celery 관련한 configuration key가 'CELERY_'로 시작해야 함을 의미한다
app.config_from_object('django.conf:settings', namespace='CELERY')
# task 모듈을 모든 등록된 장고 app config에서 load한다
app.autodiscover_tasks()
#
app.conf.update(
CELERY_BROKER_URL = 'redis://localhost:6379/0',
CELERY_ACCEPT_CONTENT = ['json'],
CELERY_TASK_SERIALIZER = 'json',
CELERY_TIMEZONE='Asia/Seoul',
CELERY_ENABLE_UTC=False,
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler',
)
django.setup()
app.autodiscover_tasks()
if __name__=='__main__':
app.start()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
__ init __ 설정
장고가 시작할때 shared_task가 이 앱을 사용할 수 있도록 app이 항상 import되게 해준다
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ('celery_app',)
tasks.py 생성
task에 쓰여지는 메서드들이 브로커에게 작업요청으로 가게되고 worker에서 실행을 시키는 로직이 된다.(무조건은 아니지만 보통 이렇게 쓰인다)
from celery import shared_task
from news.models import Press, Article, Potal, Category
from news.daum_scrapping import parse_daum
import time
@shared_task
def task_scrapping():
news_dict = parse_daum()
for v in news_dict.values():
if not v['preview_img']:
v['preview_img'] = 'default.img'
if Press.objects.filter(name=v['press']).first() is None:
Press.objects.create(name = v['press'])
if not Article.objects.filter(title = v['title']):
Article.objects.create(
press=Press.objects.filter(name=v['press']).first(),
potal = Potal.objects.filter(name='다음').first(),
category=Category.objects.filter(name=v['news_category']).first(),
code=v['news_code'],
date=v['date'],
preview_img=v['preview_img'],
title=v['title'],
content=v['content'],
ref=v['ref'],
counted_at = 0,
created_at = time.time()
)
return None
더 알아봐야 하는 내용들
- 데몬으로 celery 실행시키기
- celery에서 worker별로 task를 부여하는 방법https://iam.namjun.kim/celery/2018/09/09/celery-routing/