首页
仓库
文档
nginx手册
Docker手册
workerman
Flask
PHP
python
RabbitMQ
其他
Linux
占位1
占位2
目录
先进先出。 推送消息的(生产者) 接收消息(消费者) 解耦,异步,消峰。 ###queue 简单版 import queue q=queue.Queue(maxsize=100); q.put(1111) q.put(2222) q.put(3333) print(q.get()) print(q.get()) print(q.get()) print(q.get())# 等待队列,不会继续向下 ###RabbitMQ RabbitMQ是一款独立的软件,用Eralng语言开发。 ###客户端安装 pip install pika #简单模式 简单模式适用1个订阅,1个发布,多条推送,会被订阅者轮流取走。 发布 import time import pika # 1. 链接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672)) channel = connection.channel() # 2. 声明一个名为hello的队列,如果队列不存在,则创建 durable 持久化队列存入硬盘,服务端蹦了,还在。 channel.queue_declare(queue='word', durable=True) # 3. 发送数据 """ exchange: 交换机名称 routing_key: 消息队列名称 body: 消息内容 properties:持久化设置 """ for i in range(10): channel.basic_publish( exchange='', routing_key='word', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # 持久化消息 固定写法 )) time.sleep(3) # 4. 关闭链接 connection.close() 订阅 import pika #### 接收推送(消费者) # 1. 链接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672)) channel = connection.channel() # 2. 声明一个名为hello的队列,如果队列不存在,则创建 channel.queue_declare(queue='word', durable=True) #默认是轮询任务,可以设置公平分发,谁有空谁来取 channel.basic_qos(prefetch_count=1) #表示谁来谁取,不再按照奇偶数排列 # 3. 定义一个回调函数来处理接收到的消息 def callback(ch, method, properties, body): print("消费者接收到了任务 %r" % body) #当内部程序执行错误时,消息会被拒绝,并且不会再次被消费。 ch.basic_ack(delivery_tag=method.delivery_tag)#消息回复 (需要 auto_ack=False 模式下生效,否则保存) # 4. 告诉RabbitMQ,我们接下来要从hello队列接收消息 channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=False) #auto_ack=True 默认应答,改成False 手动应答,后才会删除列队的消息(牺牲效率) # 5. 开始接收消息 channel.start_consuming() #交换机模式 ###发布订阅模式 同一条消息会发送给,多个订阅者,订阅者创建自己的列队。(同时收同一条信息) 发布者 ```python import pika # 1. 链接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672)) channel = connection.channel() # 2. 声明中间件 """ exchange: 交换机的名称 exchange_type: 交换机的工作方式 """ ### 生产者 创建一个交换机,并发送数据, 所有的订阅者都会收到一份消息。 channel.exchange_declare(exchange='m1', exchange_type='fanout') # 3. 发送消息 """ exchange: 交换名称 routing_key: 消息的路由为空 body: 消息的内容 """ for i in range(10): channel.basic_publish( exchange='m1', routing_key='', body='Hello World!'+str(i), properties=pika.BasicProperties( delivery_mode=2, # 持久化消息 固定写法 )) # 4. 关闭连接 connection.close() ``` 订阅者 ```python import pika # 1. 链接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672)) channel = connection.channel() # 2. 声明中间件 """ exchange: 交换机的名称 exchange_type: 交换机的工作方式 """ channel.exchange_declare(exchange='m1', exchange_type='fanout') ### fanout 发布订阅模式 # 3. 随机生成一个队列&队列名称 ### 这个模式下,N个不同的订阅者都会收到生产者的消息,自己独立创建一个队列绑定交换机,处理消息。 result = channel.queue_declare(queue='', exclusive=True) #exclusive 只有声明此队列的连接可以看到和使用这个队列 queue_name = result.method.queue # 4. 将队列绑定到中间件 channel.queue_bind(exchange='m1', queue=queue_name) # 5. 定义回调函数 def callback(ch, method, properties, body): print(" [33x] Received %r" % body) # 6. 告诉RabbitMQ,我们接下来要从[queue_name]队列接收消息 channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) # 7. 开始接收消息 channel.start_consuming() ``` ###关键字模式 同一条消息会发送给,多个订阅者,订阅者创建自己的列队。 (通过关键字各自接收自己的消息) ```python import pika # 1. 链接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host='60.247.157.3', port=5672)) channel = connection.channel() # 2. 声明中间件 """ exchange: 交换机的名称 exchange_type: 交换机的工作方式 direct关键字模式 """ channel.exchange_declare(exchange='m2', exchange_type='direct') # 3. 发送消息 """ routing_key: 消息的路由键,用于确定消息应该发送到哪个队列 body: 消息的内容 """ channel.basic_publish(exchange='m2', routing_key='key1', body=b'key1 接收信息') channel.basic_publish(exchange='m2', routing_key='key2', body=b'key2 接收信息') # 4. 关闭连接 connection.close() ``` 订阅 ```python import pika # 1. 链接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host='60.247.157.3', port=5672)) channel = connection.channel() # 2. 声明中间件 """ exchange: (秘书)的名称 exchange_type: 秘书的工作方式将消息发送到指定的队列 """ channel.exchange_declare(exchange='m2', exchange_type='direct') # 3. 随机生成一个队列&队列名称 result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue # 4. 将队列绑定到中间件 channel.queue_bind(exchange='m2', queue=queue_name, routing_key='key1') channel.queue_bind(exchange='m2', queue=queue_name, routing_key='key2') # 5. 定义回调函数 def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 6. 告诉RabbitMQ,我们接下来要从[queue_name]队列接收消息 channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) # 7. 开始接收消息 channel.start_consuming() ``` ###模糊匹配模式 其实就是模糊查询的意思。 比上一个更强大。谁匹配成功,都会收到消息。 # 表示可以匹配 0 个 或 多个 单词 * 表示只能匹配 一个 单词 发布者 ```python import pika # 1. 链接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host='60.247.157.3', port=5672)) channel = connection.channel() # 2. 声明中间件 """ exchange: (秘书)的名称 exchange_type: 秘书的工作方式将消息发送到指定的队列 """ channel.exchange_declare(exchange='m3', exchange_type='topic') # 3. 发送消息 """ routing_key: 消息的路由键,用于确定消息应该发送到哪个队列 body: 消息的内容 """ channel.basic_publish(exchange='m3', routing_key='key.demo', body=b'old.demo') channel.basic_publish(exchange='m3', routing_key='key.demo.demo', body=b'old.demo.demo') # 4. 关闭连接 connection.close() ``` 消费者 ```python import pika # 1. 链接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host='60.247.157.3', port=5672)) channel = connection.channel() # 2. 声明中间件 """ exchange: (秘书)的名称 exchange_type: 秘书的工作方式将消息发送到指定的队列 """ channel.exchange_declare(exchange='m3', exchange_type='topic') # 3. 随机生成一个队列&队列名称 result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue # 4. 将队列绑定到中间件 channel.queue_bind(exchange='m3', queue=queue_name, routing_key='key.*') # 5. 定义回调函数 def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 6. 告诉RabbitMQ,我们接下来要从[queue_name]队列接收消息 channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) # 7. 开始接收消息 channel.start_consuming() ```