AMQP Client
Terms & Concepts
Queue
- prefetch: consumer 的參數。當 consumer 接收訊息是採 PUSH 模式,且發送端會有大量訊息寫入,由於 consumer 的 PUSH 模式,會一次接收大量(一次幾百或幾千)的訊息,假使 consumer 未能及時處理所有訊息,那些待處理的訊息會處於 Unacked 狀態,一旦 Unacked 的訊息量過多,主機就會終止連線。
要解決這個問題,consumer 必須使用 prefetch 限制主機每次推送至 consumer 的訊息量。 - Lazy Queue: Queue 的參數。對於較長(大)的訊息,可以使用這類 queue,記憶體的耗用可以改善,但會增加磁碟空間的使用率。
Consume Message
- Consumer 接收訊息的開發方式有分 Push (推送) 與 Poll (拉) 兩種方式。
Publish Message
- 程式端發佈訊息,不像接收端那樣有多種客製參數與不同接收模式,唯一要注意的是主機效能與負載。多 nodes 的 Cluster 模式下,應該要將不同用戶端平均連接到不同的 nodes,達到負載均衡的效益。
- Publish Confirm:
rabbitmqadmin
Usage
# Publish a message
rabbitmqadmin -H <rabbitmq-server-ip> -u <user-name> -p <secret> -V <virtual-server> publish exchange=amq.default routing_key=my-testq payload="This is Alang"
# Consume/Get a message
rabbitmqadmin -H <rabbitmq-server-ip> -u <user-name> -p <secret> -V <virtual-host> get queue=my-testq ackmode=ack_requeue_false
amqp-tools
A CLI tool is built-in Ubuntu.
Install
sudo apt update
sudo apt install amqp-tools
Usage
# Declare a queue
amqp-declare-queue --url="amqp://<user-name>:<secret>@<rabbitmq-server-ip>:<rabbitmq-server-port>/<virtual-server>" -d -q "my-testq"
# Publish a message
amqp-publish --url="amqp://<user-name>:<secret>@<rabbitmq-server-ip>:<rabbitmq-server-port>/<virtual-server>" --routing-key="my-testq" -b "Hello,World"
# Get the messages (Poll mode)
amqp-get --url="amqp://<user-name>:<secret>@<rabbitmq-server-ip>:<rabbitmq-server-port>/<virtual-server>" --queue="my-testq"
# Get the messages (Push mode)
amqp-consume --url="amqp://<user-name>:<secret>@<rabbitmq-server-ip>:<rabbitmq-server-port>/<virtual-server>" --queue="my-testq" -p 2 ./show.sh
show.sh:
#!/usr/bin/env bash
read line
echo "Message: $line"
sleep 1