博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ消息队列
阅读量:4625 次
发布时间:2019-06-09

本文共 26083 字,大约阅读时间需要 86 分钟。

目录:

  1、介绍

 

  

 

介绍:

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序应用程序的通信方法。

应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。

消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。

排队指的是应用程序通过 队列来通信。

队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ等等。

 

 

项目访问数据库的压力问题

  说明 :

  虽然后台的数据库已经通过mycat实现了数据库的高可用,并且可以抗击部分高并发。但是如果并发压力特别大。这时数据库的运行必定满负荷。很容易造成数据库宕机。

  分析问题:

    并发要求数据库第一时间执行更新操作,但是数据库现在满负荷,没有能力处理多余的请求,导致容器产品宕机风险。

  解决办法:

    准备一个消息队列,如果有高并发的请求,数据库处理不完,则将消息存入队列中.这时数据库先处理自己的请求,当请求处理完成后,从消息队列中获取请求.之后处理数.

 

这样的做法,实现了用户的请求和数据库执行的异步操作!!

 

架构升级:

  说明:引入消息队列后,主要解决一个数据库“更新并发压力”较大的问题,使用消息队列的机制,可以让后台的整个架构的性能提升至少30%,

  现在几乎全部的软件公司都在使用队列。

  服务器的请求多。

  mysql的处理有一定的峰值

  使用队列平衡这样的关系。两座上消除队列的内在是无限的。 

 

其他mq:

  rocketMQ

  kafkaMQ

 

linux安装rabbitmq

  1、导入 rabbitmq-server-3.6.1-1.noarch.rpm

  2、安装命令:rpm -ivh rabbitmq-server-3.6.1-1.noarch.rpm

  3、安装完成 

  配置文件修改:(rabbitmq.config)

    {loopback_users, []}

  复制配置文件到cd etc/rabbitmq

 

启动mq

  rabbitmq-plugins enable rabbitmq_management

   启动:service rabbitmq-server start

  停止:service rabbitmq-server stop

  重启:service rabbitmq-server restart

rabbitmq的端口号

  1、15672  rabbitMQ控制台端口

  2、5672客户端连接rabbitMQ的端口  

浏览器远程访问:

  192.168.220.134:15672

  登陆账号:guest  密码:guest

  

channels:链接rabbitMQ的唯一通道

Exchanges:交换机,可以让消息发往指定的队列中

queues:队列:在消息队列中可以有无数个队列

    如果控制台

用户权限设定

  1、Admin > add user

       

   2、定义虚拟主机:

    自己维护的队列的内容,包含路由/交换机/队列

    命名:/jt

    

  3、为用户分配虚拟主机(双击jtadmin进入)

    

rabbitMQ的工作模式

1、简单模式

  

 

  p:provider:消息的提供者,消息的发出者

  c:Consumer:消费者,将消息 进行处理

  红色区域:rabbitMQ

   jar包导入

com.rabbitmq
amqp-client
3.5.1
org.springframework.amqp
spring-rabbit
1.4.0.RELEASE

 测试代码:

package com.jt.rabbitmq;import java.io.IOException;import org.junit.Before;import org.junit.Test;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.ConsumerCancelledException;import com.rabbitmq.client.QueueingConsumer;import com.rabbitmq.client.ShutdownSignalException;//测试rabbitMQ中的简单模式public class TestRabbitMQ_1_simple {    private Connection connection = null;    /**     * rabbitMQ的连接步骤     * 1、通过用户jtadmin连接rabbitMQ(ip:端口/用户名/密码/虚拟主机)     * 2、定义消息的提供者provider     *         2.1、创建channel对象(控制队列/路由/交换机等)     *         2.2、定义消息队列     *         2.3、发送消息到队列中     * @throws IOException      */    //表示从rabbitMQ工厂中获取链接    @Before    public void initConnection() throws IOException{        //创建工厂对象        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("192.168.220.134");        factory.setPort(5672);        factory.setUsername("jtadmin");        factory.setPassword("jtadmin");        factory.setVirtualHost("/jt");            //获取连接        connection = factory.newConnection();    }    /**     * 创建生产者对象     * @throws IOException      */    @Test    public void provider() throws IOException{        //1、获取channel对象,控制队列和路由和交换机        Channel channel = connection.createChannel();        //2、定义队列        /**         * queue:队列名称         * durable:是否持久化,true:当消息队列重新启动后,队列还存在         *                         false:当队列重启后,队列不存在。         * exclusive:独有的         *             当前的消息队列是否由生产者独占,如果配置为true表示消费者不能         * autoDelete:是否自动删除。如果为true,则消息队列中没有消息时,该队列自动删除。、         * arguments:提交的参数。一般为null         */        channel.queueDeclare("queue_simple", false, false, false, null);        //3、定义需要发送的消息         String msg = "我是简单模式";        //将消息与队列绑定,并且发送        /**         * exchange:交换机名称:如果没有交换机,则为""串         * routingkey:消息发往队列的ID(参数),如果没有路由key,则写队列名称         * props:消息发送的额外的参数,如果没有参数为null         * body:发送的消息 的二进制字节码文件         */        channel.basicPublish("", "queue_simple", null, msg.getBytes());        //关闭流        channel.close();        connection.close();        System.out.println("队列发送成功!");    }        //定义消费者    /**     * 1、先获取channel对象     * 2、定义消息队列     * 3、定义消费者对象     * 4、将消费者与队列信息绑定     * 5、通过循环的方式获取队列中的内容     * 6、将获取的数据转化为字符串     * @throws IOException      * @throws InterruptedException      * @throws ConsumerCancelledException      * @throws ShutdownSignalException      */    @Test    public void consumer() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{        Channel channel = connection.createChannel();        //定义队列        channel.queueDeclare("queue_simple", false, false, false, null);        //定义消费者        QueueingConsumer consumer = new QueueingConsumer(channel);        //将队列与消费者绑定        /**         * queue:队列的名称         * autoAck:是否自动回复,队列确认后方能执行下次消息          * callback:回调参数,写的是消费者对象         *          */        channel.basicConsume("queue_simple", true,consumer);        //通过循环方式获取消息         while(true){            //获取消息队列的内容---delivery            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String msg = new String(delivery.getBody());            System.out.println("消息者队列消息 :"+msg);        }    }}
View Code

 

 

2、工作模式

工作原理:当生产者生产消息 后,保存到队列中。

轮询模式

测试代码:

package com.jt.rabbitmq;import java.io.IOException;import org.junit.Before;import org.junit.Test;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.ConsumerCancelledException;import com.rabbitmq.client.QueueingConsumer;import com.rabbitmq.client.ShutdownSignalException;public class TestRabbitMQ_2_work {        //工作模式:多个人一起消费一个队列消息.内部轮询机制            /**     * 1.定义rabbmq地址 ip:端口     * 2.定义虚拟主机     * 3.定义用户名和密码     * @throws IOException      */    private Connection connection = null;    @Before    public void init() throws IOException{        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setHost("192.168.220.134");        connectionFactory.setPort(5672);        connectionFactory.setVirtualHost("/jt");        connectionFactory.setUsername("jtadmin");        connectionFactory.setPassword("jtadmin");                //获取链接        connection = connectionFactory.newConnection();    }        @Test    public void provider() throws IOException{        //定义通道对象        Channel channel = connection.createChannel();                //定义队列        channel.queueDeclare("queue_work", false, false, false, null);                //定义广播的消息        String msg = "我是工作模式";                //发送消息        channel.basicPublish("", "queue_work", null, msg.getBytes());                //关闭流文件        channel.close();        connection.close();    }            //定义消费者    @Test    public void consumer1() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{        //定义通道        Channel channel = connection.createChannel();                //定义队列        channel.queueDeclare("queue_work", false, false, false, null);                //定义消费数  每次只能消费一条记录.当消息执行后需要返回ack确认消息 才能执行下一条        channel.basicQos(1);                //定义消费者        QueueingConsumer consumer = new QueueingConsumer(channel);                //将队列和消费者绑定  false表示手动返回ack        channel.basicConsume("queue_work", false, consumer);                while(true){            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String msg = new String(delivery.getBody());            System.out.println("队列A获取消息:"+msg);            //deliveryTag 队列下标位置            //multiple是否批量返回            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);        }    }            //定义消费者        @Test        public void consumer2() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{            //定义通道            Channel channel = connection.createChannel();                        //定义队列            channel.queueDeclare("queue_work", false, false, false, null);                        //定义消费数  每次只能消费一条记录.当消息执行后需要返回ack确认消息 才能执行下一条            channel.basicQos(1);                        //定义消费者            QueueingConsumer consumer = new QueueingConsumer(channel);                        //将队列和消费者绑定  false表示手动返回ack            channel.basicConsume("queue_work", false, consumer);                        while(true){                QueueingConsumer.Delivery delivery = consumer.nextDelivery();                String msg = new String(delivery.getBody());                System.out.println("队列B获取消息:"+msg);                //deliveryTag 队列下标位置                //multiple是否批量返回                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);            }        }                //定义消费者                @Test                public void consumer3() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{                    //定义通道                    Channel channel = connection.createChannel();                                        //定义队列                    channel.queueDeclare("queue_work", false, false, false, null);                                        //定义消费数  每次只能消费一条记录.当消息执行后需要返回ack确认消息 才能执行下一条                    channel.basicQos(1);                                        //定义消费者                    QueueingConsumer consumer = new QueueingConsumer(channel);                                        //将队列和消费者绑定  false表示手动返回ack                    channel.basicConsume("queue_work", false, consumer);                                        while(true){                        QueueingConsumer.Delivery delivery = consumer.nextDelivery();                        String msg = new String(delivery.getBody());                        System.out.println("队列C获取消息:"+msg);                        //deliveryTag 队列下标位置                        //multiple是否批量返回                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);                    }                }                }
View Code

 

3、订阅模式

 

 

只要队列绑定了交换机,当p生产者生产消息 时,这时连接交换机的全部队列都会收到这个消息 。并且所有的消费者都会执行消息 ,类似于广播-邮箱(群发)

 java测试代码

  说明:发布订阅模式测试时,需要先启动consumer,再启动provider

package com.jt.rabbitmq;import java.io.IOException;import org.junit.Before;import org.junit.Test;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.ConsumerCancelledException;import com.rabbitmq.client.QueueingConsumer;import com.rabbitmq.client.ShutdownSignalException;public class TestRabbitMQ_3_publish {//发布订阅模式private Connection connection = null;        //定义rabbit连接池    @Before    public void initConnection() throws IOException{        //定义工厂对象        ConnectionFactory connectionFactory = new ConnectionFactory();        //设定参数        connectionFactory.setHost("192.168.220.134");        connectionFactory.setPort(5672);        connectionFactory.setVirtualHost("/jt");        connectionFactory.setUsername("jtadmin");        connectionFactory.setPassword("jtadmin");                //创建连接        connection = connectionFactory.newConnection();        }                //定义生产者    @Test    public void  proverder() throws IOException{        //定义通道        Channel channel = connection.createChannel();                //定义交换机名称        String exchange_name = "E1";        //定义发布订阅模式    fanout    redirect 路由模式    topic 主题模式        channel.exchangeDeclare(exchange_name, "fanout");                for(int i=0;i<10; i++){            String msg = "发布订阅模式"+i;            channel.basicPublish(exchange_name, "", null, msg.getBytes());        }                channel.close();        connection.close();    }            /**     * 消费者需要定义队列名称  并且与交换机绑定     * @throws IOException     * @throws InterruptedException      * @throws ConsumerCancelledException      * @throws ShutdownSignalException      */    @Test    public void consumer1() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{        Channel channel = connection.createChannel();                String exchange_name = "E1";        String queue_name = "c_1";                //定义交换机模式        channel.exchangeDeclare(exchange_name, "fanout");                //定义队列        channel.queueDeclare(queue_name, false, false, false, null);                //将队列和交换机绑定        channel.queueBind(queue_name, exchange_name, "");                //定义消费数量         channel.basicQos(1);                //定义消费者        QueueingConsumer consumer = new QueueingConsumer(channel);                //将消费者和队列绑定,并且需要手动返回        channel.basicConsume(queue_name, false, consumer);                while(true){            QueueingConsumer.Delivery delivery = consumer.nextDelivery();                        String msg = "发布订阅模式-消费者1"+new String(delivery.getBody());            System.out.println(msg);                        //false表示一个一个返回            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);        }    }            /**     * 消费者需要定义队列名称  并且与交换机绑定     * @throws IOException     * @throws InterruptedException      * @throws ConsumerCancelledException      * @throws ShutdownSignalException      */    @Test    public void consumer2() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{        Channel channel = connection.createChannel();                String exchange_name = "E1";        String queue_name = "c_2";                //定义交换机模式        channel.exchangeDeclare(exchange_name, "fanout");                //定义队列        channel.queueDeclare(queue_name, false, false, false, null);                //将队列和交换机绑定        channel.queueBind(queue_name, exchange_name, "");                channel.basicQos(1);                //定义消费者        QueueingConsumer consumer = new QueueingConsumer(channel);                //定义回复方式        channel.basicConsume(queue_name, false, consumer);                while(true){            QueueingConsumer.Delivery delivery = consumer.nextDelivery();                        String msg = "发布订阅模式-消费者2"+new String(delivery.getBody());            System.out.println(msg);                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);        }    }            }
View Code

 

4、路由模式

说明:每一个队列都有自己的路由key.当生产者发送消息时,会携带一个路由key,会将消息发往路由Key一致的队列中.

路由模式是发布订阅模式的升级版

 java测试代码

package com.jt.rabbitmq;import java.io.IOException;import org.junit.Before;import org.junit.Test;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.ConsumerCancelledException;import com.rabbitmq.client.QueueingConsumer;import com.rabbitmq.client.ShutdownSignalException;public class TestRabbitMQ_4_redirect {//发布订阅模式private Connection connection = null;        //定义rabbit连接池    @Before    public void initConnection() throws IOException{        //定义工厂对象        ConnectionFactory connectionFactory = new ConnectionFactory();        //设定参数        connectionFactory.setHost("192.168.126.146");        connectionFactory.setPort(5672);        connectionFactory.setVirtualHost("/jt");        connectionFactory.setUsername("jtadmin");        connectionFactory.setPassword("jtadmin");        //创建连接        connection = connectionFactory.newConnection();        }                //定义生产者    @Test    public void  proverder() throws IOException{        Channel channel = connection.createChannel();                //定义交换机名称        String exchange_name = "redirect";                        //定义发布订阅模式    fanout    direct 路由模式    topic 主题模式        channel.exchangeDeclare(exchange_name, "direct");                for(int i=0;i<10; i++){            String msg = "生产者发送消息"+i;            String rontKey = "1707B";                        /**             * exchange:交换机名称             * routingKey:路由key             * props:参数             * body:发送消息             */            channel.basicPublish(exchange_name, rontKey, null, msg.getBytes());        }                channel.close();        connection.close();    }        /**     * 消费者需要定义队列名称  并且与交换机绑定     * @throws IOException     * @throws InterruptedException      * @throws ConsumerCancelledException      * @throws ShutdownSignalException      */    @Test    public void consumer1() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{                //定义通道        Channel channel = connection.createChannel();                //定义交换机名称        String exchange_name = "redirect";                //定义队列名称        String queue_name = "c_r_1";                //定义交换机模式        channel.exchangeDeclare(exchange_name, "direct");                //定义队列        channel.queueDeclare(queue_name, false, false, false, null);                //将队列和交换机绑定        /**         * 参数介绍:         *     queue:队列名称         *  exchange:交换机名称         *  routingKey:路由key         */        //channel.queueBind(queue, exchange, routingKey)        channel.queueBind(queue_name, exchange_name, "1707A");                //定义消费个数        channel.basicQos(1);                //定义消费者        QueueingConsumer consumer = new QueueingConsumer(channel);                //绑定消息与消费者        channel.basicConsume(queue_name, false, consumer);                while(true){            QueueingConsumer.Delivery delivery = consumer.nextDelivery();                        String msg = "路由模式-消费者1"+new String(delivery.getBody());            System.out.println(msg);                        //手动回复 一个一个回复            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);        }    }            /**     * 消费者需要定义队列名称  并且与交换机绑定     * @throws IOException     * @throws InterruptedException      * @throws ConsumerCancelledException      * @throws ShutdownSignalException      */    @Test    public void consumer2() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{        Channel channel = connection.createChannel();                String exchange_name = "redirect";        String queue_name = "c_r_2";                //定义交换机模式        channel.exchangeDeclare(exchange_name, "direct");                //定义队列        channel.queueDeclare(queue_name, false, false, false, null);                //将队列和交换机绑定        channel.queueBind(queue_name, exchange_name, "1707B");                channel.basicQos(1);                //定义消费者        QueueingConsumer consumer = new QueueingConsumer(channel);                //定义回复方式        channel.basicConsume(queue_name, false, consumer);                while(true){            QueueingConsumer.Delivery delivery = consumer.nextDelivery();                        String msg = "路由模式-消费者2"+new String(delivery.getBody());            System.out.println(msg);                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);        }    }}
View Code

5、主题模式

 

  其实就是在路由模式上添加了通配符的概念,表示有一类满足路由key的队列可以接受消息

    #号:可以匹配一个或多个字符

    Key:item.update.abc          消费者路由key item.#

 

    *:只能匹配单个字符或单词

    Key:item.update              消费者的路由key item.*  可以匹配

    Key:item.update.abc          消费者的路由key item.* 不能匹配

 

 

java测试代码

 

package com.jt.rabbitmq;import java.io.IOException;import org.junit.Before;import org.junit.Test;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;//主题模式public class TestRabbitMQ_5_topic {        private Connection connection = null;    @Before    public void initConnection() throws IOException{        //1.定义ConnectionFactory对象        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setHost("192.168.220.134");        connectionFactory.setPort(5672);        connectionFactory.setVirtualHost("/jt");        connectionFactory.setUsername("jtadmin");        connectionFactory.setPassword("jtadmin");            //获取连接        connection = connectionFactory.newConnection();        }        //定义生产者    @Test    public void proverder() throws Exception{        //获取通道        Channel channel = connection.createChannel();                //定义交换机的名称        String exchange_name = "TOP";                //创建交换机队列           //exchange  交换机名称        //type 定义类型 fanout 发布订阅模式   direct-路由模式    topic-主题模式        channel.exchangeDeclare(exchange_name, "topic");  //主题模式                for (int i = 0; i < 100; i++) {                        String msg = "主题模式"+i;                        /**             * 参数说明:             *     exchange:交换机名称             *  routingKey:路由key             *  props:参数             *  body:数据字节码             */            //channel.basicPublish(exchange, routingKey, props, body);            channel.basicPublish(exchange_name,"item.update", null, msg.getBytes());        }        channel.close();        connection.close();    }        @Test    public  void consumer1() throws Exception{                //定义通道        Channel channel = connection.createChannel();                //定义交换机名称        String exchange_name = "TOP";                //定义队列名称        String queue_name = "TOP1";                //声明交换机名称以及主题模式        channel.exchangeDeclare(exchange_name, "topic");                //定义队列        channel.queueDeclare(queue_name, false, false, false, null);                //将交换机和队列进行绑定           //参数1.队列名称     参数2交换机名称   参数3 路由key  #号匹配多个字符        channel.queueBind(queue_name, exchange_name, "item.#");                channel.basicQos(1);  //定义消费数量  1                //定义消费者        QueueingConsumer consumer = new QueueingConsumer(channel);                //将队列和消费者绑定        channel.basicConsume(queue_name, false, consumer);  //定义手动回复                while(true){            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            //获取消息队列中的数据            String msg = new String(delivery.getBody());            System.out.println("item.#消费者1:"+msg);                        //手动回复            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);            }    }            @Test    public  void consumer2() throws Exception{        Channel channel = connection.createChannel();        String exchange_name = "TOP";        String queue_name = "TOP2";        //生命交换机模式        channel.exchangeDeclare(exchange_name, "topic");        //定义队列        channel.queueDeclare(queue_name, false, false, false, null);        //将交换机和队列进行绑定           //参数1.队列名称     参数2交换机名称     参数3定义路由key        channel.queueBind(queue_name, exchange_name, "item.*");                channel.basicQos(1);  //定义消费数量  1                //定义消费者        QueueingConsumer consumer = new QueueingConsumer(channel);        channel.basicConsume(queue_name, false, consumer);  //定义手动回复                while(true){            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            //获取消息队列中的数据            String msg = new String(delivery.getBody());            System.out.println("item.*消费者2:"+msg);            //定义回执            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);            }    }}
View Code

 


 

整合rabbitmq

  1、模块划分 

    1、order订单服务器,作为消息队列的生产者

      a:消息队列中存的是order对象(1、order信息, 2、订单物流信息,3、订单商品信息)

      b:消息的生产者需要指定路由key(使用路由模式)

      c:标识交换机名称

    2、rabbitmq服务器,作为消息队列的消费者

      a:从消息队列中获取消息 ,数据可以直接进行网络传输,要求数据必须序列化

      b:定义接收消息的队列

      c:标识交换机

      d:定义路由key

      e:将数据进行入库操作

 

文件导入:

  模块位置:order

  文件1:applicationContext-rabbitmq-send.xml

applicationContext-rabbitmq-send.xml

 

 文件2:rabbitmq连接配置文件 rabbitmq.properties

rabbit.ip=192.168.220.134rabbit.port=5672rabbit.username=jtadminrabbit.password=jtadminrabbit.vhost=/jt

  文件3:

applicationContext-rabbitmq-receive.xml

 

java代码:

  通过模板发送消息 :

    说明:通过消息队列,数据处理和发送是异步的,消息已经发现,但是消息还没有处理。这时数据库中还没有数据,所以一般公司要求用户半小时后查询。

  代码 :  

//引入消息队列的模板工具类    @Autowired    private RabbitTemplate rabbitTemplate;public String saveOrder(Order order){        String orderId = order.getUserId() + System.currentTimeMillis() + "";        //赋值orderId        order.setOrderId(orderId);        //通过消息队列处理消息         /**         * routingKey:生产者的路由key         * object:需要发送的对象         */        rabbitTemplate.convertAndSend("order.save", order);        System.out.println("消息队列调用完成");        System.out.println("订单入库成功~~~");        return orderId;    }

 接收处理队列消息 ,此类直接交给rabbitmq处理

package com.jt.order.rabbit.service;import java.util.Date;import java.util.List;import org.springframework.beans.factory.annotation.Autowired;import com.jt.dubbo.pojo.Order;import com.jt.dubbo.pojo.OrderItem;import com.jt.dubbo.pojo.OrderShipping;import com.jt.order.mapper.OrderItemMapper;import com.jt.order.mapper.OrderMapper;import com.jt.order.mapper.OrderShippingMapper;public class orderServiceImpl {    //将消息队列中的内容写入数据库中    @Autowired    private OrderMapper orderMapper;    @Autowired    private OrderItemMapper orderItemMapper;    @Autowired    private OrderShippingMapper orderShippingMapper;        public void saveOrder(Order order){        Date date = new Date();        order.setCreated(date);        order.setStatus(1);        order.setUpdated(date);        System.out.println("order=" + order);        orderMapper.insert(order);        //完善物流信息及入库        OrderShipping shipping = order.getOrderShipping();        shipping.setCreated(date);        shipping.setUpdated(date);        shipping.setOrderId(order.getOrderId());        orderShippingMapper.insert(shipping);        //完善商品信息及入库        //1、实现批量入库操作,自己手写sql        //2、通过循环遍历方式,实现多次入库操作        List
orderItems = order.getOrderItems(); for(OrderItem orderItem : orderItems){ orderItem.setOrderId(order.getOrderId()); orderItemMapper.insert(orderItem); } System.out.println("消息队列入库成功~~~"); }}
orderServiceImpl

 

转载于:https://www.cnblogs.com/xiangyuqi/p/8603993.html

你可能感兴趣的文章
Java 变参函数的实现
查看>>
nrf51 SDK自带例程的解读
查看>>
SESSION技术
查看>>
数据结构(五)之直接插入排序
查看>>
SQL函数——LENGTH()和LENGTHB()
查看>>
vim - manual -个人笔记
查看>>
详解Javascript中prototype属性(推荐)
查看>>
angularjs实现首页轮播图
查看>>
Git 对象 和checkout 和stash的笔记
查看>>
团队项目总结2-服务器通信模型和顺序图
查看>>
hdu 1085 Holding Bin-Laden Captive!
查看>>
[周记]8.7~8.16
查看>>
递归定义
查看>>
kindeditor 代码高亮设置
查看>>
图的邻接表存储
查看>>
2018 leetcode
查看>>
PHP中获取当前页面的完整URL
查看>>
Chapter 4 Syntax Analysis
查看>>
vi/vim使用
查看>>
讨论Spring整合Mybatis时一级缓存失效得问题
查看>>