from flask import Flask, render_template, request, Response from faker import Faker import json, pika, uuid import pymysql import settings app = Flask(__name__) def shop_random(): fake = Faker(locale='zh_CN') # 随机生成姓名、商品、价格、地址,并返回一个字典 shop = {uuid.uuid4().hex: [fake.name(), fake.word(), fake.pyint(), fake.address()]} return shop def send_queue(data={}): connection = pika.BlockingConnection(pika.ConnectionParameters(settings.MQHost, settings.MQPort)) channel = connection.channel() # 声明一个名为 'message_queue' 的队列 channel.queue_declare(queue=settings.MQName) # 将消息发送到队列中 channel.basic_publish(exchange='', routing_key=settings.MQName, body=data) # 关闭连接 connection.close() def read_queue(): # 建立与RabbitMQ服务器的连接 connection = pika.BlockingConnection(pika.ConnectionParameters(settings.MQHost, settings.MQPort)) channel = connection.channel() # 声明一个队列 channel.queue_declare(queue=settings.MQName) # 获取队列中的消息数量 queue_info = channel.queue_declare(queue=settings.MQName, passive=True) message_count = queue_info.method.message_count connection.close() return message_count @app.route('/', methods=["GET", "POST"]) def index(): if request.method == 'POST': shops = [] num = request.form.get('shop') if request.form.get('shop') != "" else '5' num = int(num) for i in range(num): data = shop_random() send_queue(json.dumps(data, ensure_ascii=False)) shops.append(data) return render_template("index.html", items=shops) return render_template("index.html", items=[]) @app.route('/read', methods=["GET","POST"]) def read(): connection = pika.BlockingConnection(pika.ConnectionParameters(settings.MQHost,settings.MQPort)) channel = connection.channel() # 声明一个名为 'message_queue' 的队列 channel.queue_declare(queue=settings.MQName) data = [] def callback(ch, method, properties, body): nonlocal m # 这里处理从队列中获取到的消息 conn = pymysql.connect(host=settings.DBHost, user=settings.DBUser, password=settings.DBPassword) cur = conn.cursor() cur.execute('create database if not exists shop default charset "utf8";') cur.execute('use shop;') cur.execute('create table if not exists shop(id varchar(100), name varchar(20), goods varchar(20), price int, address varchar(50));') k = eval(body.decode()).items() for key, value in k: cur.execute('insert into shop values(%s,%s,%s,%s,%s);', (key, value[0], value[1], value[2], value[3])) data.append(k) conn.commit() cur.close() ch.basic_ack(delivery_tag=method.delivery_tag) m -= 1 if m == 0: ch.stop_consuming() if request.method == 'POST': m = request.form.get('dshop') if request.form.get('dshop') else '5' m = int(m) print('1', m) channel.basic_consume(queue=settings.MQName, on_message_callback=callback, auto_ack=False) # 将 auto_ack 设置为 False channel.start_consuming() message_count = read_queue() return render_template("readQueue.html", shop_nums=message_count, items=data) if __name__ == '__main__': app.run(host="0.0.0.0", port=80)