region 准备参数
var connStr = "localhost:6379,password=";
var db = 2;
SiteRedisHelper redisHelper = new SiteRedisHelper(connStr, "monster", db);
var key = "MessageQueue";
var msg = string.Empty;
#endregion
消息写入+读取(入门版)
#region 添加消息
while (true)
{
Console.WriteLine("请输入你需要发送的消息");
msg = Console.ReadLine();
if (!string.IsNullOrWhiteSpace(msg))
{
// var listLeftPush = redisHelper.ListLeftPush(key, msg);//添加一条消息并返回已添加消息数量
var listLeftPushAsync = redisHelper.ListLeftPushAsync(key, msg);//异步添加
//追加事件
listLeftPushAsync.ContinueWith((task =>
{
if (task.IsCompletedSuccessfully)
{
Console.WriteLine($"消息添加完毕,此消息队列共有{task.Result}条信息");
}
}));
}
else
{
Console.WriteLine("停止发送消息");
break;
}
};
#endregion
#region 读取消息
while (!string.IsNullOrWhiteSpace(msg = redisHelper.ListLeftPop(key)))
{
Console.WriteLine("消息出列:" + msg);
Debug.WriteLine("消息出列:" + msg);
FileLogTools.Write(msg, "RedisMSMQ.Try");
}
#endregion
运行结果
相对来说还是挺简单的,也没有遇上什么奇怪的异常,此处便不做什么太多说明
将实体做为消息进行写入/读取
稍微改造了一下使用对象做为消息进行写入/读取
<实体类>
public class MsgEntity
{
public string Content { get; set; }
public DateTimeOffset CreateTime { get; set; }
}
<添加相关>
var msgCount = redisHelper.ListLeftPush<MsgEntity>(key,new MsgEntity()
{
Content = msg,
CreateTime = DateTimeOffset.Now
});
Console.WriteLine($"添加成功,消息站已有{msgCount}条消息");
<读取的消息>
1.原始:
???? DRedisMSMQ.Try, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null RedisMSMQ.Try.Entity.MsgEntity <Content>k__BackingField<CreateTime>k__BackingFieldSystem.DateTimeOffset hello????System.DateTimeOffset DateTime
OffsetMinutes
'NCN???
... 一串乱码 + 一堆命名空间 看来写入需要调整
2.调整 :
<old>
private static byte[] Serialize(object obj)
{
try
{
if (obj == null)
return null;
var binaryFormatter = new BinaryFormatter();
using (var memoryStream = new MemoryStream())
{
binaryFormatter.Serialize(memoryStream, obj);
var data = memoryStream.ToArray();
return data;
}
}
catch (SerializationException ex)
{
throw ex;
}
}
<new>
JsonConvert.SerializeObject(redisValue)
2.1 读取同样处理
<异常记录>
1.添加时异常:System.Runtime.Serialization.SerializationException:“Type 'RedisMSMQ.Try.Entity.MsgEntity' in Assembly 'RedisMSMQ.Try, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null' is not marked as serializable.”
说明:此对象在程序集中不可进行序列化
处理:给类添加特性[Serializable]
使用实体时,处理也是非常简单的,主要是注意一下转string的方式,
个人使用的是JsonConvert 进行序列化/反序列化(json比较简洁,工具类功能也比较齐全) 其次就是编码一致
模拟简单的发布/订阅
#region 准备参数
var connStr = "localhost:6379,password=";
var db = 2;
SiteRedisHelper redisHelper = new SiteRedisHelper(connStr, "monster", db);
var key = "entrepot";
var model = default(Produce);
#endregion
#region 审核线程,订阅申请消息,给予相应的处理
ThreadPool.QueueUserWorkItem((state =>
{
Thread.Sleep(1000);
while (IsDealValid)
{
var validMsg = redisHelper.ListRightPop<Produce>(key);
if (validMsg != null && !validMsg.IsNull())
{
Console.WriteLine($"正在审核产品:{JsonConvert.SerializeObject(validMsg)}");
}
}
}));
#endregion
#region 主线程_添加产品
Console.WriteLine("欢迎来到产品中心,请填写产品注册资料");
IsDealValid = true;
while ((model = Produce.RegisterProduce())!= null)
{
var validCount = redisHelper.ListLeftPush<Produce>(key, model);//将注册资料添加到消息队列中
Console.WriteLine($"产品注册申请正在处理中……,在您之前共有{validCount-1}个产品正在处理,请耐心等待审核结果");
}
#endregion
发布/订阅
#region 订阅消息
ThreadPool.QueueUserWorkItem((state =>
{
redisHelper.Subscribe(channel, ((redisChannel, value) =>
{
//Console.WriteLine($"订阅方收到一条消息:{JsonConvert.SerializeObject(value)}");
if (!value.IsNullOrEmpty)
{
Console.WriteLine($"订阅方收到一条消息:{value.ToString()}");
}
}));
Console.WriteLine("子线程已订阅消息");
}));
#endregion
#region 主线程发布消息
while ((model = Produce.RegisterProduce()) != null)
{
var receiveCount = redisHelper.Publish(channel, model);
Console.WriteLine($"此条消息已被{receiveCount}个人订阅");
}
#endregion
发布订阅 vs 消息队列
1. 消息队列中的消息不能重复读取,发布订阅中的消息由订阅方共享
2. 若发布时没有订阅方,后续加入的订阅方将不能收到此条消息。在消息队列中,若消息没有及时出列,消息将会继续保存在消息队列中
总结
总体来说,redis的操作都是比较简单的,因为官方已经有集成api供我们调用,所以操作起来还是没什么难度,只需要了解方法的应用就可以了,复杂一点的,应该就是业务流程的一些具体应用,应用场景的使用,效率的提升
相关类说明:
SiteRedisHelper
参考博文:http://www.cnblogs.com/liqingwen/archive/2017/04/06/6672452.html
《构造方法》
public SiteRedisHelper(string connStr, string defaultKey, int db = -1)
{
//连接字符串
ConnectionString = connStr;
//建立连接
_connMultiplexer = ConnectionMultiplexer.Connect(ConnectionString);
//默认前缀【无实用】
DefaultKey = defaultKey;
//注册相关事件 【未应用】
RegisterEvent();
//获取Database操作对象
_db = _connMultiplexer.GetDatabase(db);
}
Copyright © 广州京杭网络科技有限公司 2005-2025 版权所有 粤ICP备16019765号
广州京杭网络科技有限公司 版权所有