import pika import pymysql # 建立到 RabbitMQ 的连接 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 声明一个名为 'message_queue' 的队列 channel.queue_declare(queue='message_queue') def callback(ch, method, properties, body): # 这里处理从队列中获取到的消息 conn = pymysql.connect(host='localhost', user='root', password='123456') cur = conn.cursor() cur.execute('create database if not exists shop default charset "utf8";') cur.execute('use shop;') cur.execute('create table if not exists shop(name varchar(20), goods varchar(20), price int, address varchar(50));') for k,v in eval(body.decode()).get('data').items(): cur.execute('insert into shop values(%s,%s,%s,%s);', (k, v[0], v[1], v[2])) print(k, v[0], v[1], v[2]) conn.commit() cur.close() # 从 'message_queue' 队列中获取消息 channel.basic_consume(queue='message_queue', on_message_callback=callback, auto_ack=True) print('Waiting for messages. To exit press CTRL+C') channel.start_consuming()