会写代码的产品经理

.net core 和 WPF 开发升讯威在线客服系统:使用线程安全的 BlockingCollection 实现高性能的数据处理

本系统文章详细介绍使用 .net core 和 WPF 开发升讯威在线客服与营销系统的过程。本篇简要介绍了如何使用线程安全的 BlockingCollection 实现高性能的数据处理。
分类:
标签:
2021/2/2 23:33:47

本系列文章详细介绍使用 .net core 和 WPF 开发 升讯威在线客服与营销系统 的过程。本产品已经成熟稳定并投入商用。

免费使用 & 私有化部署:https://kf.shengxunwei.com


文章目录列表请点击这里


视频实拍:演示升讯威在线客服系统在网络中断,直接禁用网卡,拔掉网线的情况下,也不丢消息,不出异常。
https://blog.shengxunwei.com/Home/Post/fe432a51-337c-4558-b9e8-347b58cbcd53


对于在线客服与营销系统,客服端指的是后台提供服务的客服或营销人员,他们使用客服程序在后台观察网站的被访情况,开展营销活动或提供客户服务。在本篇文章中,我将详细介绍如何在 .net core 环境下使用 TCP 通信技术实现稳定高效与安全的客服端程序。

这里存在几个技术难点需要注意:

  • 需要使客服端程序具备 24 小时不间断运行的能力,在处理网络通信时,必须100%的稳定。
  • 必须具备应对网络波动的能力,不能网络稍有波动就断线。即使出现了短暂的网络中断,客服程序也不能做掉线处理,而是要具备保持和自动重连的能力。
  • 要考虑安全性问题,服务端的端口监听,要能识别正常客服端连接,还是来自攻击者的连接。

访客端实现的效果:

访客端在手机上的效果:

后台客服的实现效果:


线程安全集合

System.Collections.Concurrent 命名空间,其中包含多个线程安全且可缩放的集合类。 多个线程可以安全高效地从这些集合添加或删除项,而无需在用户代码中进行其他同步。 编写新代码时,只要将多个线程同时写入到集合时,就使用并发集合类。

细粒度锁定和无锁机制

某些并发集合类型使用轻量同步机制,如 SpinLock、SpinWait、SemaphoreSlim 和 CountdownEvent。 这些同步类型通常在将线程真正置于等待状态之前,会在短时间内使用 忙旋转。 预计等待时间非常短时,旋转比等待所消耗的计算资源少得多,因为后者涉及资源消耗量大的内核转换。 对于使用旋转的集合类,这种效率意味着多个线程能够以非常快的速率添加和删除项。

BlockingCollection

BlockingCollection 是一个线程安全集合类,可提供以下内容:

  • 生成者/使用者模式的实现; BlockingCollection 是接口的包装 IProducerConsumerCollection 。
  • 利用和方法并发添加和移除多个线程中的项 Add Take 。
  • Add Take 当集合已满或为空时阻止和操作的绑定集合。
  • Add Take 使用 CancellationToken 或方法中的对象取消或操作 TryAdd TryTake 。

下面的示例演示如何使用:


using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

class BlockingCollectionDemo
{
    static async Task Main()
    {
        await AddTakeDemo.BC_AddTakeCompleteAdding();
        TryTakeDemo.BC_TryTake();
        FromToAnyDemo.BC_FromToAny();
        await ConsumingEnumerableDemo.BC_GetConsumingEnumerable();
        Console.WriteLine("Press any key to exit.");
        Console.ReadKey();
    }
}
class AddTakeDemo
{
    // Demonstrates:
    //      BlockingCollection<T>.Add()
    //      BlockingCollection<T>.Take()
    //      BlockingCollection<T>.CompleteAdding()
    public static async Task BC_AddTakeCompleteAdding()
    {
        using (BlockingCollection<int> bc = new BlockingCollection<int>())
        {
            // Spin up a Task to populate the BlockingCollection
            Task t1 = Task.Run(() =>
            {
                bc.Add(1);
                bc.Add(2);
                bc.Add(3);
                bc.CompleteAdding();
            });

            // Spin up a Task to consume the BlockingCollection
            Task t2 = Task.Run(() =>
            {
                try
                {
                    // Consume consume the BlockingCollection
                    while (true) Console.WriteLine(bc.Take());
                }
                catch (InvalidOperationException)
                {
                    // An InvalidOperationException means that Take() was called on a completed collection
                    Console.WriteLine("That's All!");
                }
            });

            await Task.WhenAll(t1, t2);
        }
    }
}

class TryTakeDemo
{
    // Demonstrates:
    //      BlockingCollection<T>.Add()
    //      BlockingCollection<T>.CompleteAdding()
    //      BlockingCollection<T>.TryTake()
    //      BlockingCollection<T>.IsCompleted
    public static void BC_TryTake()
    {
        // Construct and fill our BlockingCollection
        using (BlockingCollection<int> bc = new BlockingCollection<int>())
        {
            int NUMITEMS = 10000;
            for (int i = 0; i < NUMITEMS; i++) bc.Add(i);
            bc.CompleteAdding();
            int outerSum = 0;

            // Delegate for consuming the BlockingCollection and adding up all items
            Action action = () =>
            {
                int localItem;
                int localSum = 0;

                while (bc.TryTake(out localItem)) localSum += localItem;
                Interlocked.Add(ref outerSum, localSum);
            };

            // Launch three parallel actions to consume the BlockingCollection
            Parallel.Invoke(action, action, action);

            Console.WriteLine("Sum[0..{0}) = {1}, should be {2}", NUMITEMS, outerSum, ((NUMITEMS * (NUMITEMS - 1)) / 2));
            Console.WriteLine("bc.IsCompleted = {0} (should be true)", bc.IsCompleted);
        }
    }
}

class FromToAnyDemo
{
    // Demonstrates:
    //      Bounded BlockingCollection<T>
    //      BlockingCollection<T>.TryAddToAny()
    //      BlockingCollection<T>.TryTakeFromAny()
    public static void BC_FromToAny()
    {
        BlockingCollection<int>[] bcs = new BlockingCollection<int>[2];
        bcs[0] = new BlockingCollection<int>(5); // collection bounded to 5 items
        bcs[1] = new BlockingCollection<int>(5); // collection bounded to 5 items

        // Should be able to add 10 items w/o blocking
        int numFailures = 0;
        for (int i = 0; i < 10; i++)
        {
            if (BlockingCollection<int>.TryAddToAny(bcs, i) == -1) numFailures++;
        }
        Console.WriteLine("TryAddToAny: {0} failures (should be 0)", numFailures);

        // Should be able to retrieve 10 items
        int numItems = 0;
        int item;
        while (BlockingCollection<int>.TryTakeFromAny(bcs, out item) != -1) numItems++;
        Console.WriteLine("TryTakeFromAny: retrieved {0} items (should be 10)", numItems);
    }
}

class ConsumingEnumerableDemo
{
    // Demonstrates:
    //      BlockingCollection<T>.Add()
    //      BlockingCollection<T>.CompleteAdding()
    //      BlockingCollection<T>.GetConsumingEnumerable()
    public static async Task BC_GetConsumingEnumerable()
    {
        using (BlockingCollection<int> bc = new BlockingCollection<int>())
        {
            // Kick off a producer task
            await Task.Run(async () =>
            {
                for (int i = 0; i < 10; i++)
                {
                    bc.Add(i);
                    await Task.Delay(100); // sleep 100 ms between adds
                }

                // Need to do this to keep foreach below from hanging
                bc.CompleteAdding();
            });

            // Now consume the blocking collection with foreach.
            // Use bc.GetConsumingEnumerable() instead of just bc because the
            // former will block waiting for completion and the latter will
            // simply take a snapshot of the current state of the underlying collection.
            foreach (var item in bc.GetConsumingEnumerable())
            {
                Console.WriteLine(item);
            }
        }
    }
}

本文对使用线程安全的 BlockingCollection 实现高性能的数据处理进行了简要的介绍,在接下来的文章中,我将具体解构服务端程序的结构和设计、客服端程序的结构和设计,敬请关注。


免费使用 & 私有化部署:https://kf.shengxunwei.com

联系QQ: 279060597
联系E-mail:cao.silhouette@msn.com

联系:

E-mail: cao.silhouette@msn.com

QQ: 279060597

联系:

E-mail: cao.silhouette@msn.com

QQ: 279060597

@南京

关于我 我的免费产品 我的开源项目

ICP证:皖ICP备14020687号-1

公安备案:34110202000256