Development with RabbitMQ
Bash
curl
curl -u login:pass -i -H "content-type:application/json" -X POST http://localhost:15672/api/exchanges/%2Fvhost/exchange/publish \
-d'{"properties":{},"routing_key":"","payload":"you message","payload_encoding":"string"}'
rabbitmqadmin
rabbitmqadmin
uses HTTP API authentication mechanism (basic HTTP authentication).
Install
# Install the dependency
yum install python3
# Download the script from the RabbitMQ Server
wget http://<rabbitmq-server-hostname>:15672/cli/rabbitmqadmin
#
chmod 0755 rabbitmqadmin
mv rabbitmqadmin /usr/local/bin
rabbitmqadmin -h
# Verify the connection via HTTP API
rabbitmqadmin -H <rabbitmq-server-hostname> -u <username> -p <password> list vhosts
Basic Operation
## 在其他 Linux 主機上測試
# 宣告 queue
rabbitmqadmin -H <rabbitmq-server-hostname> -u <username> -p <password> -V <vhost-name> declare queue name=my-testq durable=true
# 發佈訊息
rabbitmqadmin -H <rabbitmq-server-hostname> -u <username> -p <password> -V <vhost-name> publish exchange=amq.default routing_key=my-testq payload="This is Alang"
# 接收訊息
rabbitmqadmin -H <rabbitmq-server-hostname> -u <username> -p <password> -V <vhost-name> get queue=my-testq ackmode=ack_requeue_false
# 接收訊息, 格式為 tsv
rabbitmqadmin -H <rabbitmq-server-hostname> -u <username> -p <password> -V <vhost-name> -f tsv get queue=my-testq ackmode=ack_requeue_false
# 一次接收 5 訊息
rabbitmqadmin -H <rabbitmq-server-hostname> -u <username> -p <password> -V <vhost-name> get queue=my-testq count=5 ackmode=ack_requeue_false
while read -r line; do
echo $line | rabbitmqadmin publish exchange=amq.default routing_key=my_queue ;
done < messages
rabbitmqadmin publish exchange=amq.default routing_key=test payload="hello, world"
# With parallel
cat messages | parallel -j 100 \
./rabbitmqadmin -H $RABBITMQ_HOST \
-u $RABBITMQ_USERNAME \
-p $RABBITMQ_PASSWORD \
publish exchange=amq.default \
routing_key=myqueue \
payload="{}"
Python
Tutorials
- python rabbitmq Code Example (codegrepper.com)
- Part 2.3: Getting started with RabbitMQ and Python - CloudAMQP
- 利用Python操作訊息佇列RabbitMQ的方法教程 | 程式前沿 (codertw.com)
- Python 实现 RabbitMQ 的六种工作模式(附 Python 代码)_Python_AlwaysBeta_InfoQ写作社区
- RedHat AMQ Python Client
RedHat AMQ Python Client
Install via RHN
[root@dotnetdev ~]# dnf repolist all | grep amq-client
amq-clients-2-for-rhel-8-x86_64-debug-rpms disabled
amq-clients-2-for-rhel-8-x86_64-rpms disabled
amq-clients-2-for-rhel-8-x86_64-source-rpms disabled
amq-clients-2.9-for-rhel-8-x86_64-debug-rpms disabled
amq-clients-2.9-for-rhel-8-x86_64-rpms disabled
amq-clients-2.9-for-rhel-8-x86_64-source-rpms disabled
[root@dotnetdev ~]# subscription-manager repos --enable=amq-clients-2-for-rhel-8-x86_64-rpms
Repository 'amq-clients-2-for-rhel-8-x86_64-rpms' is enabled for this system.
[root@dotnetdev ~]# dnf repolist
Updating Subscription Management repositories.
repo id repo name
amq-clients-2-for-rhel-8-x86_64-rpms Red Hat AMQ Clients 2 for RHEL 8 x86_64 (RPMs)
rhel-8-for-x86_64-appstream-rpms Red Hat Enterprise Linux 8 for x86_64 - AppStream (RPMs)
rhel-8-for-x86_64-baseos-rpms Red Hat Enterprise Linux 8 for x86_64 - BaseOS (RPMs)
[root@dotnetdev ~]# yum install python3-qpid-proton python-qpid-proton-docs
PHP
Tutorials
dotNet
Tutorials
- [GitHub] AMQP.Net Lite
- RedHat AMQ .Net Client
- C# 連線至 RabbitMQ Cluster - 使用 RabbitMQ .Net Client 及 EasyNetQ
- [GitHub] EasyNetQ
Case: Console RabbitMQ Client
Package Required:
- RabbitMQ.Client
Target Framework:
- net6.0
Platform Supported:
- Windows 64
- RedHat Linux 8 64 (net6 runtime is required)
AMQP Client
Terms & Concepts
Queue
- prefetch: Consumer 的參數。當 consumer 接收訊息是採 PUSH 模式,且發送端會有大量訊息寫入,由於 consumer 的 PUSH 模式,會一次接收大量(一次幾百或幾千)的訊息,假使 consumer 未能及時處理所有訊息,那些待處理的訊息會處於 Unacked 狀態,一旦 Unacked 的訊息量過多,主機就會終止連線。
要解決這個問題,consumer 必須使用 prefetch 限制主機每次推送至 consumer 的訊息量。 - Lazy Queue: Queue 的參數。對於較長(大)的訊息,可以使用這類 queue,記憶體的耗用可以改善,但會增加磁碟空間的使用率。
Consume Message
- Consumer 接收訊息的開發方式有分 Push (推送) 與 Poll (拉) 兩種方式。
Publish Message
- 程式端發佈訊息,不像接收端那樣有多種客製參數與不同接收模式,唯一要注意的是主機效能與負載。多 nodes 的 Cluster 模式下,應該要將不同用戶端平均連接到不同的 nodes,達到負載均衡的效益。
- Publish Confirm:
TTL (Time to Live)
如果要讓訊息在超過所設置的時間,沒有被接收時即自動清除。這個設置稱為 TTL,在 RabbitMQ 裡 TTL 可以用在 Message 或 Queue 本身。
Message TTL
- 用途: 超過設置時間的訊息,即自動清除。
- 如果 Message 發送到多個 Queues 時,依照各自 Queue 的 TTL 的設置,訊息的清除時間可能會有不同。沒有設置 TTL 的 Queue,所屬的訊息則不會影響 。
- 參數值: Millisecond (60 seconds = 60000)
方法一: Policy
rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues
方法二: Queue 參數
- x-message-ttl: 60000
Sample codes in C#
var args = new Dictionary<string, object>();
args.Add("x-message-ttl", 60000);
model.QueueDeclare("myqueue", false, false, false, args);
Queue TTL
- 用途: 超過設置時間的 Queue,該 Queue 會自動清除。
- 參數值: Millisecond (30 mins = 1800000)
方法一: Policy
rabbitmqctl set_policy expiry ".*" '{"expires":1800000}' --apply-to queues
方法二: Queue 參數
- x-expires: 1800000
Sample codes in Java
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);
rabbitmqadmin
Usage
# Publish a message
rabbitmqadmin -H <rabbitmq-server-ip> -u <user-name> -p <secret> -V <virtual-server> publish exchange=amq.default routing_key=my-testq payload="This is Alang"
# Consume/Get a message
rabbitmqadmin -H <rabbitmq-server-ip> -u <user-name> -p <secret> -V <virtual-host> get queue=my-testq ackmode=ack_requeue_false
amqp-tools
A CLI tool is built-in Ubuntu.
Install
sudo apt update
sudo apt install amqp-tools
Usage
# Declare a queue
amqp-declare-queue --url="amqp://<user-name>:<secret>@<rabbitmq-server-ip>:<rabbitmq-server-port>/<virtual-server>" -d -q "my-testq"
# Publish a message
amqp-publish --url="amqp://<user-name>:<secret>@<rabbitmq-server-ip>:<rabbitmq-server-port>/<virtual-server>" --routing-key="my-testq" -b "Hello,World"
# Get the messages (Poll mode)
amqp-get --url="amqp://<user-name>:<secret>@<rabbitmq-server-ip>:<rabbitmq-server-port>/<virtual-server>" --queue="my-testq"
# Get the messages (Push mode)
amqp-consume --url="amqp://<user-name>:<secret>@<rabbitmq-server-ip>:<rabbitmq-server-port>/<virtual-server>" --queue="my-testq" -p 2 ./show.sh
show.sh:
#!/usr/bin/env bash
read line
echo "Message: $line"
sleep 1