rabbitmq 学习记录 -- ACK和数据持久化
为了数据不丢失, 需要在两个层面上做一些配置. 一个是ACK, 一个是数据持久化.
ACK
如果没有启用的话, 消费者拿走消息的时候, queue就把它删除了.
消费者拿走一条消息之后, 还没有处理完就crash了. 那么这条消息就丢失了. 为了保证消息一定被处理完了才从queue中被删掉, 就要启用Message acknowledgment .
启用之后, queue会在收到ack之后把消息删掉.
在这里没有timeout的概念, 哪怕这个任务执行很久, 不管多久, 会一直等ack. 或者是tcp链接断了, 才会把消息再给另外一个消费者.
ack默认是开启的, 也可以显示显示地关闭
channel.basic_consume(callback, queue=queue_name, no_ack=True)
callbak里面要记得发送ack,否则消息要被一次又一次的处理,然后再次回到队列 … …
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( 10 )
raise SystemExit(1) # message will put back to the original queue
ch.basic_ack(delivery_tag = method.delivery_tag)
print " [x] Done"
来跑几个例子测试一下
生产者:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika
import sys
def main():
body = ' '.join(sys.argv[1:]) or 'Hello World'
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body=body,
)
connection.close()
if __name__ == '__main__':
main()
消费者:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika
import time
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep(10)
raise SystemExit(1)
ch.basic_ack(delivery_tag = method.delivery_tag)
print " [x] Done"
def main():
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_consume(callback,
queue='hello',
)
channel.start_consuming()
if __name__ == '__main__':
main()
发送一条消息到队列 , 然后消费. 观察一下状态
# rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 1
等10秒, 再看, 消息没有被消费成功, 再次回到队列中.
# rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 1 0
REJECT
可以用ACK告诉rabbitmq任务处理完了, 但是如果没有成功的话, 也可以再把消息塞回队列. 就是Negative Acknowledgement. pika对应的方法是basic_reject
但可得注意, 不要搞成死循环了
数据持久化
启用ack之后, 消费者死掉不会丢失数据, 但rabbitmq进程死掉的话, 消息就丢掉了. 为保证数据不丢失, 还需要启动数据持久化. 需要在两个层面上做持久化:
- 队列的持久化
- 消息的持久化
channel.queue_declare(queue='hello', durable=True)
这样就申明了一个持久化的队列, durable的属性是不会变的, 如果之前hello队列已经申明过且不是持久化的, 这个再次申明会失败. 这个队列不会因为rabbitmq重启而丢失, 接下来还要继续做消息的持久化.
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
Q: 如果在一个非持久化的队列上发送数据时, 指明要持久化, 为发生什么情况?
A: 可以正常发送, 但重启rabbitmq之后, 队列丢失, 当然消息找不到了.