1. 程式人生 > >.Net下RabbitMQ訊息佇列的使用

.Net下RabbitMQ訊息佇列的使用

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;


using System.IO;
using System.Threading;
using RabbitMQ.Client;




namespace RabbitmqClient
{
    /// <summary>
    /// P2P模式,即一個生產者一個消費者
    /// </summary>
    public class MyRabbitMq
    {
        private readonly ConnectionFactory rabbitMqFactory;
        const string ExchangeName = "test.exchange";
        const string QueueName = "test.queue";


        public MyRabbitMq(bool isLocal = true, string remoteAddress = "localhost")
        {
            if (isLocal)
                rabbitMqFactory = new ConnectionFactory { HostName = "localhost" };
            else 
                rabbitMqFactory = new ConnectionFactory { HostName = remoteAddress };
        }


        public void Register_durable_Exchange_and_Queue()
        {
            using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                channel.ExchangeDeclare(exchange: ExchangeName, type: "direct", durable: true, autoDelete: false, arguments: null);
                channel.QueueDeclare(queue: QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                channel.QueueBind(queue: QueueName, exchange: ExchangeName, routingKey: QueueName);
            }
        }


        /// <summary>
        /// 生產者,插入訊息
        /// </summary>
        /// <param name="message">訊息</param>
        /// <param name="persistent">是否持久化</param>
        public void SendMessage(string message, bool persistent = true)
        {
            using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                var props = channel.CreateBasicProperties();
                if (persistent)
                {
                    props.Persistent = true;                
                }


                var msgBody = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);


                Console.WriteLine(" [x] Sent {0}", message);
            }
        }


        /// <summary>
        /// 消費者,取出訊息
        /// </summary>
        /// <returns></returns>
        public string GetMessage()
        {
            using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                BasicGetResult msgResponse = channel.BasicGet(queue: QueueName, noAck: true);


                string msgBody = Encoding.UTF8.GetString(msgResponse.Body);


                return msgBody;
            }
        }




        /// <summary>
        /// 一次都消費光,清空佇列,沒有訊息會阻塞等
        /// </summary>
        /// <returns></returns>
        public string Consume_messages_from_Queue_Subscription()
        {
            using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                var consumer = new QueueingBasicConsumer(channel);


                channel.BasicConsume(QueueName, noAck: true, consumer: consumer);


                var msgResponse = consumer.Queue.Dequeue(); //blocking


                var msgBody = Encoding.UTF8.GetString(msgResponse.Body);


                return msgBody;
            }
        }




        public void Publish_5_messages_to_test_exchange()
        {
            using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                for (var i = 0; i < 5; i++)
                {
                    var props = channel.CreateBasicProperties();
                    //props.SetPersistent(true);
                    props.Persistent = true;
                    string msg = "Hello, World!" + i;
                    var msgBody = Encoding.UTF8.GetBytes(msg);
                    channel.BasicPublish(ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);
                }
            }
        }


    }
}

在main()中往佇列插入訊息

相關推薦

.NetRabbitMQ訊息佇列的使用

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.IO; using System.Threadi

.net core RabbitMQ 訊息佇列

上篇我們說到erlang的安裝,現在有了基礎前提,就可以繼續安裝RabbitMQ了! 這裡我選用的RabbitMQ版本是: PS:這個RabbitMQ版本是要對應前面erlang版本,所以前面我們安裝的版本是21.2,因為需要提供21.X版本的erlang才能安裝 rabbitmq-server-3.7.1

RabbitMQ如何在命令列清除訊息佇列中的所有資料

最近在研究 RabbitMQ 訊息佇列, 安裝好進行測試的時候發覺在一個名為 MyRabbitMQ 的訊息佇列中已經插入了大量的資料。 最後不得不找方法清除。 首先定位到 rabbitMQ 安裝目錄的sbin 目錄下。 然後shift+右鍵 。 調出右鍵選單。 選擇在此處開啟

CentOS6.9安裝rabbitmq訊息佇列

宣告:此文字非本人原創,而是一個名叫西安-晁州的作者在部落格園釋出的,本人也是比較尊重原創的,轉載只是為了方便檢視而已。 廢話不多說,需要安裝的朋友按照以下步驟就可以簡單按照了。。。。。。。 安裝如下步驟: 首先安裝erlang 1 yu

訊息佇列系列(一):.Net平臺訊息佇列介紹

文章來源:http://www.cnblogs.com/gossip/p/4373547.html          特點:     一、跨平臺:包括Mac、Windows、Linux等作業系統             二、支援主流語言呼叫:包括Java、.N

RabbitMQ訊息佇列系列教程(二)Windows安裝和部署RabbitMQ

摘要 本篇經驗將和大家介紹Windows下安裝和部署RabbitMQ訊息佇列伺服器,希望對大家的工作和學習有所幫助! 目錄 一、Erlang語言環境的搭建 RabbitMQ開源訊息佇列服務是使用Erlang語言開發的,因此我們要使用他就必須先進行Erlang語言環境的搭建,其實是非常簡

RabbitMQ訊息佇列在PHP的應用

訊息佇列的實現中,RabbitMQ以其健壯和可靠見長.公司的專案中選擇了它作為訊息佇列的實現.關於MQ的機制和原理網上有很多文章可以看,這裡就不再贅述,只講幾個比較容易混淆的問題 1,binding key和routing key   binding key和routing key是都不過是自己設定的一組字

linux(deepin15.4)部署叢集RabbitMQ訊息佇列映象模式(三)

第三天 一、映象佇列策略 1、映象佇列可以防止主節點掛掉,整個佇列就癱瘓了。所以要想在主節點掛掉或故障也能正常應用,就要複製佇列內容到叢集裡的每個節點,須要建立映象佇列。 2、映象模式配置完成之後,會存在一個主節點和多個映象節點(或稱為熱備佇列,Slave

Linux 開源訊息佇列系統 RabbitMQ 安裝使用

RabbitMQ 是最流行的開源訊息佇列系統,它是在在 AMQP (一個提供統一訊息服務的應用層標準高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計) 基礎上完成的,可複用的企

.NET Core微服務之基於EasyNetQ使用RabbitMQ訊息佇列

一、訊息佇列與RabbitMQ 1.1 訊息佇列   “訊息”是在兩臺計算機間傳送的資料單位。訊息可以非常簡單,例如只包含文字字串;也可以更復雜,可能包含嵌入物件。訊息被髮送到佇列中,“訊息佇列”是在訊息的傳輸過程中儲存訊息的容器。   訊息佇列(Message Queue),是分散式系統中重要

RabbitMQ 訊息佇列之 Exchange Types

寫在前面 RabbitMQ遵循AMQP 0-9-1協議 複製程式碼 AMQP 0-9-1協議簡介 訊息釋出到交換站,這通常被比作郵局或郵箱。然後交換器使用稱為繫結的規則將訊息副本分發到佇列。然後,AMQP代理將訊息傳遞給訂閱佇列的消費者,或者根據需要從佇列中獲取訊息。 釋出訊息時,釋出者可以指定各種

python 64式: 第3式、rabbitmq訊息佇列使用

topicProductor.py內容如下 #!/usr/bin/env python # -*- coding: utf-8 -*- import pika import sys ''' 問題: 實現基於rabbitmq的生產者和消費者,消費者可以支援繫結路由鍵為notification.

python之RabbitMQ訊息佇列

RabbitMQ:訊息佇列 PY裡的佇列有:執行緒QUEUE、程序QUEUE 程序queue可以用於父程序與子程序進行互動,或者同屬於一父程序下多個子程序進行互動,但如果是兩個獨立的程式,是不能用這個QUEUE進行通訊的。 兩個獨立的程式之間,要找一箇中間代理,比如可以用socket通訊

RabbitMQ訊息佇列的基本原理

1.背景 RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue)的開源實現。 2.應用場景 2.1非同步處理 場景說明:使用者註冊後,需要發註冊郵件和註冊簡訊,傳統的做法有兩種1.序列的方式;2.並行的方

Rabbitmq---訊息佇列

一 . MQ:message queue   訊息佇列的作用:   1 通訊解耦   2 高峰限流 原理分析: 一開始,認證系統是強耦合的,A系統傳遞認證系統訊息接收計算結果的過程中   1 傳給認證系統   2 認證系統計算   3 返回計算結果   4 讀取A系統邏輯 只要當前計算

RabbitMQ 訊息佇列 - topic 模式分發訊息

推薦閱讀 https://blog.csdn.net/column/details/15500.html topic 模式 根據 Binding 指定的 RoutingKey, Exchange 對 key 進行模式匹配後投遞到相應的 Queue, 模式匹配時符號

RabbitMQ 訊息佇列 - fanout 模式分發訊息

推薦閱讀 https://blog.csdn.net/column/details/15500.html fanout 模式 將同一個 message 傳送到所有同該 Exchange 繫結的 queue, 只要 RoutingKey 是一樣, 這條訊息都會被投遞

RabbitMQ 訊息佇列 - direct 模式分發訊息

推薦閱讀 https://blog.csdn.net/column/details/15500.html direct 模式 根據 Binding 指定的 Routing Key, 將符合Key的訊息傳送到 Binding 的 Queue p_direc

rabbitmq訊息佇列配置】

      #erlang語言支援包     #rabbitmq-server安裝支援   #新增使用者     #刪除使用者   #使用者角色   #啟動 &nbs

rabbitmq訊息佇列設定過期時間和過期訊息處理

rabbitmq訊息佇列設定過期時間和過期訊息處理 適用場景 電商秒殺搶購活動中處理使用者下單和付款時間不一致,設定過期時間,過期則不允許付款 參考 https://blog.csdn.net/zhu_tianwei/article/details/53563