What is a message queue


A “message queue” is a container that holds messages during transmission.


Personal Understanding:I break it down into two terms . When a large number of clients generate a large number of network requests at the same time ( ), the server’s capacity must have a limit. At this time, if there is a container, first let these messages queuing is good, fortunately there is a data structure called , through the queuing (FIFO), the message then passed to our server, the pressure is reduced a lot, this great is the message queue!


This understanding also includes these two concepts: Client ->  Server ->  When there is  ,  two essential concepts, the above understanding is that  corresponds to  , of course, there is also the case of  in real development. The next article will also mention many times.

 Message Queue Advantage

  •  Application decoupling


    Message queues allow consumers and producers to directly disassociate from each other and not interfere with each other by simply sending messages to the queue, and can independently extend or modify the processing on both sides as long as they can be ensured to adhere to the same interface conventions, which can be implemented in Node.js for producers and phython for consumers.

  •  Flexibility and peak processing power


    When there is a sudden and dramatic increase in client access, the access to the server has exceeded the maximum peak value that the service can handle, and even lead to server timeout load collapse, the use of message queues can solve this problem, you can avoid the peak problem , etc.

  •  Sorting Guarantee


    Message queues can control the order of data processing, because the message queue itself uses the queue as a data structure, FIFO (first elected), in some scenarios the order of data processing is very important, such as the order of goods order.

  •  asynchronous communication


    Some messages in the message queue, do not need to be processed immediately, the message queue provides an asynchronous processing mechanism, you can put the message in the queue and not immediately processed, when needed, or asynchronous slowly processed, some of the unimportant to send text messages and mailbox functions can be used.

  •  scalability


    As mentioned earlier, the message queue can do  , if we want to enhance the frequency of message in-queue and out-of-queue processing, it is very simple, do not need to change anything in the code, you can directly modify some of the configuration of the message queue can be, for example, we would like to limit the number of messages each time to send to the consumer and so on.


There are advantages that must have its realistic application scenarios, the article will talk about their corresponding application scenarios for the advantages later.

 Introduction to the types of message queues


Introduce a few mainstream message queues currently on the market (extracurricular knowledge, can be ignored)


  • Kafka: is an open source stream processing platform developed by the Apache Software Foundation, written in Scala and Java, is a high-throughput distributed publish-subscribe messaging system that supports millions of concurrency per second on a single machine. In addition, the positioning of Kafka is mainly in the logging and other aspects , because Kafka design is the original intention of  , can be regarded as a  an important component , very targeted. 0.8 version began to support replication , does not support things , so there is no strict requirements for the duplication of the message , lost , error .

  • RocketMQ: Ali open source messaging middleware , is a low-latency , highly reliable , scalable , easy to use messaging middleware , the idea originated in Kafka. the biggest problem commercial version of the charge , some features are not open .

  • RabbitMQ: developed by Erlang ( with the same low latency as the native socket ) language based on the AMQP protocol open source message queuing system . Can guarantee the reliability, stability, security of the message.  The characteristics of RabbitMQ, no doubt, RabbitMQ the highest, because it is the realization of the language is born with high concurrency and high availability of the erlang language, inherent  advantage.


Description: This article focuses on RabbitMQ to explain the more common. Personally, I think that these kinds of message queuing middleware can achieve the function, through redis can also be realized, the idea.


Beginning message queue (message queue in node.js simple application)

 Rabbitmq Basic Installation

 Mac Version Installation

 To install directly through HomeBrew, execute the following command

brew install rabbitmq

 Starting rabbitmq


$ /usr/local/Cellar/rabbitmq/3.7.8  
$ sbin/rabbitmq-server


Browser input http://localhost:15672/#/ default username password guest

 Basic schematic after installation

 Visualization Interface Modular Functionality Description.


 For other system installations, please do your own online search

 Description of several port differences


5672: Default port number for communication 15672: Default port number for management console 25672: Port number for cluster communication Note: If RabbitMQ is installed successfully on an AliCloud ECS server, the external network cannot be accessed because the port is not open due to a security group issue.

 Basic commands after Rabbitmq installation

 The following is a list of commands that are commonly used in the terminal


  • whereis rabbitmq: see where rabbitmq is installed

  • rabbitmqctl start_app: start the application

  • whereis erlang: view erlang installation location

  • rabbitmqctl start_app: start the application

  • rabbitmqctl stop_app: shut down the application
  •  rabbitmqctl status: node status

  • rabbitmqctl add_user username password: add user

  • rabbitmqctl list_users: list all users

  • rabbitmqctl delete_user username: delete user

  • rabbitmqctl add_vhost vhostpath: create a virtual host

  • rabbitmqctl list_vhosts: list all virtual hosts

  • rabbitmqctl list_queues: view all queues

  • rabbitmqctl -p vhostpath purge_queue blue: Purge messages from the queue.


Note: All the commands in the above terminal need to be executed in the rabbitmqctl sbin directory to be useful, otherwise it will report an error: 


Node.js implementation of a simple HelloWorld message queue


Draw a basic picture of the HelloWorld message queue to include the following concepts.


 A few concepts before looking at this code

  •  Producer: The producer of the message
  •  Consumer: the person who receives the message

  • Channel channel: after establishing a connection, a channel channel will be acquired

  • exchange : the exchange, the message needs to be sent to the exchange first, can also be said to be the first step where the message is stored (there are many types of exchanges, will be described in detail later).

  • Message queue: where messages are stored until they reach the consumer, and where the exchange machine delivers the messages.
  •  ack acknowledgement: an acknowledgement that a message has been consumed after it has been received

 amqplib module


A recommended npm module is amqplib .


Github: github.com/squaremo/am…

$ npm install amqplib

 Producer code product.js

const amqp =require('amqplib');

async function  product(params) {

    const connect =await amqp.connect('amqp://localhost:5672');
  const connection = await amqp.connect('amqp://localhost:5672');

     const channel = await connection.createChannel();
 

     const routingKey = 'helloKoalaQueue';
     const msg = 'hello koala';
 
     for (let i=0; i<10000; i++) {

         await channel.publish('', routingKey, Buffer.from(`${msg} ${i}`));
     }

     await channel.close();

     await connect.close();
}
product();

 Producer Code Interpretation and Running Results

 Consumer Code consumer.js


const amqp = require('amqplib');

async function consumer() {

    const connection = await amqp.connect('amqp://localhost:5672');


    const channel = await connection.createChannel();


    const queueName = 'helloKoalaQueue';

    await channel.assertQueue(queueName);


    await channel.consume(queueName, msg => {
        console.log('Consumer:', msg.content.toString());
        channel.ack(msg);
    });
}
consumer();

 Producer Code Interpretation and Running Results

 node consumer.js
  •  Execution results after running


  • At this point I change the name of the queue in the code to helloKoalaQueueHaHa ,at this point go to the Rabbitmq visualization interface, the queue module, and create this queue


    See here again proves that the message queue excellent there is no connection between the producer to create this helloKoalaQueueHaHa route name again, will also be consumed normally, and will print the message, you can actually do to try.


  • At this point I change the name of the queue in the code to helloKoalaQueueHaHa ,at this point go to the Rabbitmq visualization interface, the queue module, and create this queue


    See here again to prove that the message queue excellent  there is no connection between the producer to create this helloKoalaQueueHaHa route name again, will also be consumed normally, and will print the message, you can actually do to try.

 How to release the message queue

 Delete the message queue directly in the visualization interface


  1. Visit http://{rabbitmq installation IP}:15672 to log in.

  2. Click on queues and here you can see all the Queues you have created.

  3. Select a Queue, then you will enter a list interface, there is a Delete button at the bottom, confirm Queue to delete the queue/Purge Message to clear the message can be.


Cons: This only deletes one queue at a time, which can be particularly slow if there are too many messages in the queue.

 Message queue release (deletion) through code

 Message Queuing Switch Explained

 Remember one thing first.


The producer must specify an exchange when sending a message, otherwise the message cannot reach the message queue directly, and the exchange routes the message to one or more Queues (or discards it)

 Then we begin this chapter on switches


If you do not specify exchange (null), it defaults to the AMQP default switch. The AMQP default routing rule matches routes based on the routingKey and whether there is a queue with the same name on the mq.

 Types of switches

 Four types are commonly used

  • fanout

  • direct

  • topic

  • headers


Regardless of the type of switch, there is a bindingbinding operation, except that there are different route binding strategies depending on the type of switch. The different types do what is shown in the red box below.

 Fanout.


Exchange routing rules of type fanout are very simple, it will route all messages sent to that Exchange to all Queues bound to it, no routing keys need to be set.


Explanation:All messages are routed to both Queues, is it possible for both consumers to receive all the exact same messages? The answer is yes, the queue messages received by both consumers should normally be identical. This type is often used for broadcast type requirements, or it can also be consumer 1 to record logs, consumer 2 to print the logs.

 Corresponding code implementation:

 Producer.

const amqp = require('amqplib');

async function producer() {

    const connection = await amqp.connect('amqp://localhost:5672');

    const channel = await connection.createChannel();


    const exchangeName = 'fanout_koala_exchange';
    const routingKey = '';
    const msg = 'hello koala';


    await channel.assertExchange(exchangeName, 'fanout', {
        durable: true,
    });

    await channel.publish(exchangeName, routingKey, Buffer.from(msg));


    await channel.close();
    await connection.close();
}
producer();

 Consumers:

const amqp = require('amqplib');

async function consumer() {

    const connection = await amqp.connect('amqp://localhost:5672');


    const channel = await connection.createChannel();


    const exchangeName = 'fanout_koala_exchange';
    const queueName = 'fanout_kaola_queue';
    const routingKey = '';


    await channel.assertExchange(exchangeName, 'fanout', { durable: true });


    await channel.assertQueue(queueName);


    await channel.bindQueue(queueName, exchangeName, routingKey);

   
    await channel.consume(queueName, msg => {
        console.log('Consumer:', msg.content.toString());
        channel.ack(msg);
    });

}
consumer();


Note: Other types of code have been put on github at: github.com/koala-codin… Welcome star exchange.

direct


direct Routes the message to the Queue whose binding key exactly matches the routing key.

topic


The producer specifies the RoutingKey messages are forwarded accordingly by fuzzy matching according to the queue specified by the consumer, two wildcard patterns: #: can match one or more keywords *: can only match one keyword

headers


header exchange(header exchange) is a bit similar to topic exchange, but unlike topic exchange whose routing is based on routing key, header exchange’s routing value is based on the header data of the message. Theme exchange routing key is only string, while header exchange can be integer and hash value header Exchange type is less used, you can google yourself to understand.

 Message Queuing Thoughts and Deeper Explorations

 Message queue implementation of rpc

 (This subparagraph is taken from the Internet; refer to the article for clarification.)


  1. The client is both a producer and a consumer, sending RPC call messages to the RPC request queue and listening to the RPC response queue.

  2. The server side listens to the RPC request queue and executes the server side method after receiving the message
  3.  The server sends the result of the method execution to the RPC response queue


(Note that this is just to mention the RPC knowledge, because a single RPC an article are not necessarily said to finish, interested in the queue can be used to try RPC)

 Is message persistence necessary?


It is in memory, and if it hangs, the messages in the message queue will be lost. RabbitMQ can enable persistence. RabbitMQ can enable persistence. Different development languages can set persistence parameters.


Here is an example of Node.js, other languages can be searched on their own

    await channel.assertExchange(exchangeName, 'direct', { durable: true });


Also recommend a good article on persistence: juejin.cn/post/684490…

 Is there a need for message answering after the consumer has finished?


The simple explanation of message answering is that notifies the message queue when it has finished consuming.


I think it is necessary to turn on this configuration, the consumer to complete the task in the message queue, the consumer may fail or hang in the middle, once RabbitMQ sends a message to the consumer and then quickly remove the message from  , in this case, the consumer corresponding to the work process failure or hang, the process is being processed by the message will be lost. In this case, if the consumer’s worker process fails or hangs, all messages being processed by that process will be lost.


To ensure that messages are never lost, RabbitMQ supports a message answering mechanism. When a message is accepted and processed, an answer is sent back from the consumer to the sender, and then RabbitMQ deletes it.


If a consumer hangs (channel, link closure, or tcp link loss) and doesn’t send an ack response, RabbitMQ assumes that the message wasn’t processed completely and puts it back on the queue. In this way you can ensure that messages are never lost, even if a worker process hangs by chance.


By default message answering is turned off. It’s time to turn it on with the false (auto-ack configuration item) parameter


Node.js is used as an example, but other languages can be searched for.


await channel.consume(queueName, msg => {
    console.log('koala:', msg.content.toString());
    
    channel.ack(msg);
}, { noAck: false });

 How do we achieve fair scheduling?


You can configure the value of the prefetch count entry to be 1, which instructs RabbitMQ not to send more than one message to each consumer at a time. In other words, no message will be sent to that consumer until the message has been processed and answered. Instead, it will send a message to the next available consumer or worker.


Here’s an example from Node.js of the interface method prefetch provided by the amqplib library for stream-limited implementations.

 prefetch Parameter Description:


  • count: the number of N messages to be pushed to the consumer at a time, if these N messages are not acked, the producer will not push again until these N messages are consumed.

  • global: at which level to restrict, ture for channel, false for consumer, default is false.

await channel.prefetch(1, false);


How to implement a switch to send messages to multiple consumers in sequence, and which switch to choose?


If a producer, two consumers, issuing messages, I want the queue to send to consumer 1 first, after sending consumer 1 to send consumer 2, so that there is a sequential interaction to send, which switch should be now? Note the interaction, think about it after you read it? There is also no need to manually call back the message queue to complete after the consumer is done? Is message persistence necessary, and what are the benefits of persistence?


(After looking at message passing in message queues, you’ll have questions about how messages in the pipeline (producers) are consumed by consumers put into the queue and then taken out of the queue)

 Message Queuing Application Scenarios

  1.  Double eleven goods spike / ticket function realization


    We in the double 11, when we early in the morning a lot of seconds and snatch goods, and then go to the checkout, will find that the interface will remind us to let us wait a moment, as well as some friendly picture text reminders. Instead of the era of the previous few years, moving the page is stuck, reporting errors, etc. to present to the user.


    A diagram to explain the use of message queues in the second kill ticket and other scenarios: (Note: before looking down, if you have done e-commerce class second kill, you can think about how you implement, we can discuss oh. Here just want to say the role of the message queue, not the final optimization of the results, such as using redis to control the total cache, etc.)

  2.  Points redemption (points can be used on multiple platforms)


    Points redemption module, there is a company with multiple departments have to use this module, this time you can decouple this feature through the message queue to realize. Each departmental system to do the department’s business, but they can use the points system for the exchange of goods and so on. Other modules are completely decoupled from the points module.

  3.  Send emails, user big data analysis, etc. Synchronous to asynchronous functionality implementation


    There’s more to say about this feature, starting with user registration for a platform.

    •  User Registration

    • The user registers to select several interest tags, and at this point it is necessary to calculate the recommended content based on the user’s attributes, user analytics
    •  An email may need to be sent to the user after registration
    •  Sends the user a system notification with instructions.

     Normal registration without high concurrency:


    For the user, he just wants to register to use the program, and as long as the server stores his account information in the database, he can log in and do what he wants. The user does not care about these things, the server can put other operations into the corresponding 消息队列 and then immediately return the results to the user, by the message queue for these operations.

     Suppose a large number of users register and high concurrency occurs:


    Mail interface can not withstand, or analyze the information of a large number of calculations to make the cpu overloaded, which will appear although the user data records quickly added to the database, but stuck in the mail or analyze the information, resulting in a substantial increase in the response time of the request, or even timeout, which is a bit uneconomical. Faced with this situation is also generally put these operations into the message queue (producer-consumer model), the message queue slowly for processing, while the registration request can be completed very quickly, will not affect the user to use other features.


  4. RabbitMQ-based Node.js communication with Phython or other language implementations


    The decoupling feature of RabbitMQ is also utilized here, which allows you to communicate not only with Phython, but also with many other languages, so I won’t go into details.

By lzz

Leave a Reply

Your email address will not be published. Required fields are marked *