C#编程之c#生产/消费RabbitMQ
小标 2018-10-22 来源 : 阅读 2246 评论 0

摘要:本文主要向大家介绍了C#编程之c#生产/消费RabbitMQ,通过具体的内容向大家展示,希望对大家学习C#编程有所帮助。

本文主要向大家介绍了C#编程之c#生产/消费RabbitMQ,通过具体的内容向大家展示,希望对大家学习C#编程有所帮助。


public sealed class JsonSerializer  
    {
        public static byte[] Serialize(object message)
        {
            return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
        }

        public static object Deserialize(byte[] bytes)
        {
            return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(bytes));
        }
    }
    public sealed class BinarySerializer  
    {
        public static byte[] SerializeToBytes(object obj)
        {
            var formatter = new BinaryFormatter();
            using (var stream = new MemoryStream())
            {
                formatter.Serialize(stream, obj);

                return StreamUtil.ReadAllBytes(stream);
            }
        }
       
        public static object DeserializeFromBytes(byte[] bytes)
        {
            var formatter = new BinaryFormatter();
            using (var stream = new MemoryStream(bytes))
            {
                return formatter.Deserialize(stream);
            }
        }
    }


private static bool RawPublishMessage()
        {
            var exchange = "TestExchangeRouting...";
            var routingKey = "rk";
            Uri uri = new Uri("amqp://192.168.1.1:1234/");
            ConnectionFactory factory = new ConnectionFactory();

            factory.UserName = "guest";  
            factory.Password = "guest";
            factory.VirtualHost = "/";  
            factory.RequestedHeartbeat = 0;
            factory.Endpoint = new AmqpTcpEndpoint(uri);

            //创建一个连接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建一个通道
                using (IModel channel = connection.CreateModel())
                {
                    //声明一个路由
                    channel.ExchangeDeclare(exchange, "direct");
                    var queueOk = channel.QueueDeclare("testQueue", true, false, false, null);
                    channel.QueueBind(queueOk.QueueName, exchange, routingKey);

                    var model = new Order
                    {
                        Id = 100021,
                        Title = "工一一个测试Test"
                    }; //这个才是具体的发送内容  

                    var body = JsonSerializer.Serialize(model);

                    var properties = channel.CreateBasicProperties();
                    properties.SetPersistent(true);
                    properties.ContentType = typeof(Order).AssemblyQualifiedName;
                    properties.ContentEncoding = "JSON";

                    //写入  
                    channel.BasicPublish(exchange, routingKey, properties, body);
                    Console.WriteLine("写入成功");
                }

            }
            return false;
        }

        private static bool RawGetMessage()
        {
            var exchange = "TestExchangeRouting...";
            var routingKey = "rk";
            Uri uri = new Uri("amqp://192.168.1.1:1234/");
            ConnectionFactory factory = new ConnectionFactory();

            factory.UserName = "guest";  
            factory.Password = "guest";
            factory.VirtualHost = "/";  
            factory.RequestedHeartbeat = 0;
            factory.Endpoint = new AmqpTcpEndpoint(uri);

            //创建一个连接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建一个通道
                using (IModel channel = connection.CreateModel())
                {
                    var basicConsumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume("testQueue", false, basicConsumer);

                    while (true)
                    {
                        try
                        {
                            BasicDeliverEventArgs basicDeliverEventArgs  ;
                            basicConsumer.Queue.Dequeue(1000, out basicDeliverEventArgs);
                            if (basicDeliverEventArgs == null)
                            {
                                break;
                            }

                            Task.Run(() =>
                            {
                                channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false);
                            }).Wait();
                            
                             
                            var body = JsonSerializer.Deserialize(basicDeliverEventArgs.Body);
                            Console.WriteLine(string.Format("RoutingKey:{0},Body:{1}", basicDeliverEventArgs.RoutingKey,
                                 JsonConvert.SerializeObject(body, Formatting.Indented)));
                        }
                        catch (Exception)
                        {
                            break;
                        }
                    }

                    channel.Close();
                }
                connection.Close();
            }

            return false;
        }

 

public static byte[] ReadAllBytes(Stream stream)
        {           

            var bytes = new byte[stream.Length];

            stream.Seek(0, SeekOrigin.Begin);

            for (var i = 0; i < stream.Length; i++)
            {
                bytes[i] = (byte)stream.ReadByte();
            }

            return bytes;
        }

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标编程语言C#.NET频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程