개발일기장

Node.js에서 RabbitMQ(AMQP) 사용하기. Consume (3) 본문

node.js

Node.js에서 RabbitMQ(AMQP) 사용하기. Consume (3)

게슬 2021. 7. 28. 10:50
728x90

https://tlqckd0.tistory.com/18

 

Node.js에서 RabbitMQ(AMQP) 사용하기. Publish (2)

2021.07.28 - [node.js] - Node.js에서 RabbitMQ(AMQP) 사용하기. 브라우저에서 설정 (1) Node.js에서 RabbitMQ(AMQP) 사용하기. 브라우저에서 설정 (1) 이거 해보려고 이것저것 찾아봤는데 너무 단순하기도 하고,..

tlqckd0.tistory.com

여기서 이어지는 글입니다..

단순하게 글쓰고 싶었는데 너무 길어진다.

 

Consume은 진짜 간단하다.

Exchange랑 Routing key같은거 필요없고, 그냥 원하는 Queue에서 메시지를 받아오기만 하면 된다.

 

1. consume 함수를 만들자.

const consume =  ({connection, channel}) =>{
    return new Promise((resolve, reject)=>{
        // 원하는 Queue의 이름을 적어준다.
        channel.consume('MyTestRabbit',async (msg)=>{
            // 1. 받은 메시지를 파싱하고.
            const msgBody = msg.content.toString();
            const data = JSON.parse(msgBody);

            // 1-1. 뭘 받았는지 출력해보자.
            console.log('Received Data : ',data);

            // 2. 잘 받았으니 ACK를 보내자.
            await channel.ack(msg);
        })

        // Queue가 닫혔거나. 에러가 발생하면 reject
        connection.on('close',(err)=>{
            return reject(err);            
        })

        connection.on('error',(err)=>{
            return reject(err);            
        })
    })
}

여기서도 연결된 Channel을 받아서 consume 설정을 해주면 된다.

 

2. 채널을 연결

const listenForMessages = async ()=>{
    //채널을 연결
    const connection = await amqp.connect(amqpURL);
    const channel = await connection.createChannel();
    await channel.prefetch(1);

    await consume({connection, channel});
}

여기서 좀 그런게

consume을 위한 채널은 connection.createChannel을 하고

publish를 위한 채널은 connection.createConfirmChannel을 통해 만들어준다.

 

전체코드

const amqp = require('amqplib');

const amqpURL = 'amqp://guest:guest@localhost:5672';

const listenForMessages = async ()=>{
    //채널을 연결
    const connection = await amqp.connect(amqpURL);
    const channel = await connection.createChannel();
    await channel.prefetch(1);

    await consume({connection, channel});
}

const consume =  ({connection, channel}) =>{
    return new Promise((resolve, reject)=>{
        // 원하는 Queue의 이름을 적어준다.
        channel.consume('MyTestRabbit',async (msg)=>{
            // 1. 받은 메시지를 파싱하고.
            const msgBody = msg.content.toString();
            const data = JSON.parse(msgBody);

            // 1-1. 뭘 받았는지 출력해보자.
            console.log('Received Data : ',data);

            // 2. 잘 받았으니 ACK를 보내자.
            await channel.ack(msg);
        })

        // Queue가 닫혔거나. 에러가 발생하면 reject
        connection.on('close',(err)=>{
            return reject(err);            
        })

        connection.on('error',(err)=>{
            return reject(err);            
        })
    })
}

listenForMessages();

 

이거를 실행시켜보자.

실행시

아까 보냈던 Queue에 대해서 순서대로 메시지를 받아온다.

그러고 나서 브라우저로 돌아가면

이렇게 Queue에 쌓인 메시지를 다 빼고 Ready가 0으로 된것을 볼 수 있다.

 

 

 

 

대충 이렇게 끝이난다.

 

이번 포스트에서는 설정같은것도 기본으로 간단하게 했고 그랬다.

보안이라던가 추가적인 설정, 변수같은것도 어떻게 사용하는지 공부해봐야겠다.

728x90
Comments