默认情况下,redis-py 每次在执行请求时都会创建和断开一次连接操作(连接池申请连接,归还连接池),如果想要在一次请求中执行多个命令,则可以使用 pipline 实现一次请求执行多个命令,并且默认情况下 pipline 是原子性操作。
见以下实例:
import redis pool = redis.ConnectionPool(host=, port=6379) r = redis.Redis(connection_pool=pool) # pipe = r.pipeline(transaction=False) pipe = r.pipeline(transaction=True) r.set(, ) r.set(, ) pipe.execute()
5> 发布和订阅
发布者:服务器
订阅者:Dashboad 和数据处理
发布订阅的 Demo 如下:
#!/usr/bin/env python # -*- coding:utf-8 -*- import redis class RedisHelper: def __init__(self): self.) self.chan_sub = self.chan_pub = public(self, msg): self.__conn.publish(self.chan_pub, msg) return True def subscribe(self): pub = self.__conn.pubsub() pub.subscribe(self.chan_sub) pub.parse_response() return pub
RedisHelper订阅者:
#!/usr/bin/env python # -*- coding:utf-8 -*- from monitor.RedisHelper import RedisHelper obj = RedisHelper() redis_sub = obj.subscribe() while True: msg= redis_sub.parse_response() print msg
发布者:
#!/usr/bin/env python # -*- coding:utf-8 -*- from monitor.RedisHelper import RedisHelper obj = RedisHelper() obj.public()
更多参见:https://github.com/andymccurdy/redis-py/
三、RabbitMQ 1、简介、安装、使用
RabbitMQ 是一个在 AMQP 基础上完成的,可复用的企业消息系统。他遵循 Mozilla Public License 开源协议。
MQ 全称为 Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方式。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
流程上生产者把消息放到队列中去, 然后消费者从队列中取出消息。
RabbitMQ安装
# 安装配置epel源 $ rpm -ivh .noarch.rpm # 安装erlang $ yum -y install erlang # 安装RabbitMQ $ yum -y install rabbitmq-server
# 启动 service rabbitmq-server start/stop # 默认监听端口5672 (带上 SSL 默认 5671)
python 安装 API
pip install pika or easy_install pika or 源码 https://pypi.python.org/pypi/pika
2、使用API操作RabbitMQ
基于队列 Queue 实现生产者消费者模型:
#!/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading message = Queue.Queue(10) def producer(i): while True: message.put(i) def consumer(i): while True: msg = message.get() for i in range(12): t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start()
View Code
RabbitMQ 实现:
pika # ######################### 生产者 ######################### connection = pika.BlockingConnection(pika.ConnectionParameters( host=)) channel = connection.channel() channel.queue_declare(queue=) channel.basic_publish(exchange='', routing_key=, body=) ) connection.close() pika # ########################## 消费者 ########################## connection = pika.BlockingConnection(pika.ConnectionParameters( host=)) channel = connection.channel() channel.queue_declare(queue=) def callback(ch, method, properties, body): % body) channel.basic_consume(callback, queue=, no_ack=True) ) channel.start_consuming()
1、acknowledgment 消息不丢失
no-ack = False,如果消费者由于某些情况宕了(its channel is closed, connection is closed, or TCP connection is lost),那 RabbitMQ 会重新将该任务放入队列中。