# Development with RabbitMQ # Bash ##### curl ```shell curl -u login:pass -i -H "content-type:application/json" -X POST http://localhost:15672/api/exchanges/%2Fvhost/exchange/publish \ -d'{"properties":{},"routing_key":"","payload":"you message","payload_encoding":"string"}' ``` ##### rabbitmqadmin `rabbitmqadmin` uses HTTP API authentication mechanism (basic HTTP authentication). - [Command Line Tools](https://rabbitmq.com/cli.html) - [Management Command Line Tool](https://rabbitmq.com/management-cli.html) Install ```bash # Install the dependency yum install python3 # Download the script from the RabbitMQ Server wget http://:15672/cli/rabbitmqadmin # chmod 0755 rabbitmqadmin mv rabbitmqadmin /usr/local/bin rabbitmqadmin -h # Verify the connection via HTTP API rabbitmqadmin -H -u -p list vhosts ``` Basic Operation ```bash ## 在其他 Linux 主機上測試 # 宣告 queue rabbitmqadmin -H -u -p -V declare queue name=my-testq durable=true # 發佈訊息 rabbitmqadmin -H -u -p -V publish exchange=amq.default routing_key=my-testq payload="This is Alang" # 接收訊息 rabbitmqadmin -H -u -p -V get queue=my-testq ackmode=ack_requeue_false # 接收訊息, 格式為 tsv rabbitmqadmin -H -u -p -V -f tsv get queue=my-testq ackmode=ack_requeue_false # 一次接收 5 訊息 rabbitmqadmin -H -u -p -V get queue=my-testq count=5 ackmode=ack_requeue_false ``` ```shell while read -r line; do echo $line | rabbitmqadmin publish exchange=amq.default routing_key=my_queue ; done < messages rabbitmqadmin publish exchange=amq.default routing_key=test payload="hello, world" # With parallel cat messages | parallel -j 100 \ ./rabbitmqadmin -H $RABBITMQ_HOST \ -u $RABBITMQ_USERNAME \ -p $RABBITMQ_PASSWORD \ publish exchange=amq.default \ routing_key=myqueue \ payload="{}" ``` # Python ##### Tutorials - [python rabbitmq Code Example (codegrepper.com)](https://www.codegrepper.com/code-examples/python/python+rabbitmq) - [Part 2.3: Getting started with RabbitMQ and Python - CloudAMQP](https://www.cloudamqp.com/blog/part2-3-rabbitmq-for-beginners_example-and-sample-code-python.html) - [利用Python操作訊息佇列RabbitMQ的方法教程 | 程式前沿 (codertw.com)](https://codertw.com/%E7%A8%8B%E5%BC%8F%E8%AA%9E%E8%A8%80/364907/) - [Python 实现 RabbitMQ 的六种工作模式(附 Python 代码)\_Python\_AlwaysBeta\_InfoQ写作社区](https://xie.infoq.cn/article/02f4007cb22f991ee49882efa) - [RedHat AMQ Python Client](https://access.redhat.com/documentation/en-us/red_hat_amq_clients/2.11/html/using_the_amq_python_client/index) ##### RedHat AMQ Python Client Install via RHN ``` [root@dotnetdev ~]# dnf repolist all | grep amq-client amq-clients-2-for-rhel-8-x86_64-debug-rpms disabled amq-clients-2-for-rhel-8-x86_64-rpms disabled amq-clients-2-for-rhel-8-x86_64-source-rpms disabled amq-clients-2.9-for-rhel-8-x86_64-debug-rpms disabled amq-clients-2.9-for-rhel-8-x86_64-rpms disabled amq-clients-2.9-for-rhel-8-x86_64-source-rpms disabled [root@dotnetdev ~]# subscription-manager repos --enable=amq-clients-2-for-rhel-8-x86_64-rpms Repository 'amq-clients-2-for-rhel-8-x86_64-rpms' is enabled for this system. [root@dotnetdev ~]# dnf repolist Updating Subscription Management repositories. repo id repo name amq-clients-2-for-rhel-8-x86_64-rpms Red Hat AMQ Clients 2 for RHEL 8 x86_64 (RPMs) rhel-8-for-x86_64-appstream-rpms Red Hat Enterprise Linux 8 for x86_64 - AppStream (RPMs) rhel-8-for-x86_64-baseos-rpms Red Hat Enterprise Linux 8 for x86_64 - BaseOS (RPMs) [root@dotnetdev ~]# yum install python3-qpid-proton python-qpid-proton-docs ``` # PHP ##### Tutorials - [how to call a model from rabbitmq php consumer's callback in codeigniter? (google.com)](https://groups.google.com/g/codeigniter/c/KTNK4HZ77fw) # dotNet ##### Tutorials - \[GitHub\] [AMQP.Net Lite](https://github.com/Azure/amqpnetlite) - [RedHat AMQ .Net Client](https://access.redhat.com/documentation/en-us/red_hat_amq_clients/2.11/html/using_the_amq_.net_client/index) - [C# 連線至 RabbitMQ Cluster - 使用 RabbitMQ .Net Client 及 EasyNetQ](https://blog.yowko.com/dotnet-client-rabbitmq-cluster/) - \[GitHub\] [EasyNetQ](https://github.com/EasyNetQ/EasyNetQ) ##### Case: Console RabbitMQ Client [ConsoleRabbitMQ.zip](https://osslab.tw/attachments/72) Package Required: - RabbitMQ.Client Target Framework: - net6.0 Platform Supported: - Windows 64 - RedHat Linux 8 64 (net6 runtime is required) # AMQP Client ##### Terms & Concepts Queue - prefetch: Consumer 的參數。當 consumer 接收訊息是採 PUSH 模式,且發送端會有大量訊息寫入,由於 consumer 的 PUSH 模式,會一次接收大量(一次幾百或幾千)的訊息,假使 consumer 未能及時處理所有訊息,那些待處理的訊息會處於 Unacked 狀態,一旦 Unacked 的訊息量過多,主機就會終止連線。 要解決這個問題,consumer 必須使用 prefetch 限制主機每次推送至 consumer 的訊息量。 - Lazy Queue: Queue 的參數。對於較長(大)的訊息,可以使用這類 queue,記憶體的耗用可以改善,但會增加磁碟空間的使用率。 Consume Message - Consumer 接收訊息的開發方式有分 Push (推送) 與 Poll (拉) 兩種方式。 - [快速掌握RabbitMQ(四)——两种消费模式和QOS的C#实现 - 捞月亮的猴子 - 博客园 (cnblogs.com)](https://www.cnblogs.com/wyy1234/p/10883568.html) Publish Message - 程式端發佈訊息,不像接收端那樣有多種客製參數與不同接收模式,唯一要注意的是主機效能與負載。多 nodes 的 Cluster 模式下,應該要將不同用戶端平均連接到不同的 nodes,達到負載均衡的效益。 - Publish Confirm: ##### TTL (Time to Live) 如果要讓訊息在超過所設置的時間,沒有被接收時即自動清除。這個設置稱為 TTL,在 RabbitMQ 裡 TTL 可以用在 Message 或 Queue 本身。 Message TTL - 用途: 超過設置時間的訊息,即自動清除。 - 如果 Message 發送到多個 Queues 時,依照各自 Queue 的 TTL 的設置,訊息的清除時間可能會有不同。沒有設置 TTL 的 Queue,所屬的訊息則不會影響 。 - 參數值: Millisecond (60 seconds = 60000) 方法一: Policy ```bash rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues ``` 方法二: Queue 參數 - x-message-ttl: 60000 Sample codes in C# ```c# var args = new Dictionary(); args.Add("x-message-ttl", 60000); model.QueueDeclare("myqueue", false, false, false, args); ``` Queue TTL - 用途: 超過設置時間的 Queue,該 Queue 會自動清除。 - 參數值: Millisecond (30 mins = 1800000) 方法一: Policy ```bash rabbitmqctl set_policy expiry ".*" '{"expires":1800000}' --apply-to queues ``` 方法二: Queue 參數 - x-expires: 1800000 Sample codes in Java ```java Map args = new HashMap(); args.put("x-expires", 1800000); channel.queueDeclare("myqueue", false, false, false, args); ``` ##### rabbitmqadmin - [Management Command Line Tool — RabbitMQ](https://rabbitmq.com/management-cli.html) Usage ```bash # Publish a message rabbitmqadmin -H -u -p -V publish exchange=amq.default routing_key=my-testq payload="This is Alang" # Consume/Get a message rabbitmqadmin -H -u -p -V get queue=my-testq ackmode=ack_requeue_false ``` ##### amqp-tools A CLI tool is built-in Ubuntu. Install ```bash sudo apt update sudo apt install amqp-tools ``` Usage ```bash # Declare a queue amqp-declare-queue --url="amqp://:@:/" -d -q "my-testq" # Publish a message amqp-publish --url="amqp://:@:/" --routing-key="my-testq" -b "Hello,World" # Get the messages (Poll mode) amqp-get --url="amqp://:@:/" --queue="my-testq" # Get the messages (Push mode) amqp-consume --url="amqp://:@:/" --queue="my-testq" -p 2 ./show.sh ``` show.sh: ```bash #!/usr/bin/env bash read line echo "Message: $line" sleep 1 ```