it编程 > 编程语言 > C#

C#中高效的多线程并行处理实现方式详解

14人参与 2025-04-24 C#

前言

在处理大型数据集时,单线程处理往往成为性能瓶颈。通过将数据分割成多个小块,并利用多线程进行并行处理,可以显著提升程序的执行效率和响应速度。

本文将详细介绍几种高效的多线程并行处理实现方式,帮助开发者优化数据处理流程。

使用parallel.foreach进行并行处理

最简单的实现方式是使用c#内置的parallel.foreach方法。

namespace appparallel
{
    internal class program
    {
        static object lockobject = 
        new object();
        static void main(string[] args)
        {
            // 创建示例数据
            var largelist = 
            enumerable.range(1, 1000000).tolist();

            // 设置并行选项
            var paralleloptions = new paralleloptions
            {
                maxdegreeofparallelism = 
                environment.processorcount 
                // 使用处理器核心数量的线程
            };

            try
            {
                parallel.foreach(largelist, paralleloptions,
                (number) =>
                {
                    // 这里是对每个元素的处理逻辑
                    var result = complexcalculation(number);

                    // 注意:如果需要收集结果,要考虑线程安全
                    lock (lockobject)
                    {
                        // 进行线程安全的结果收集
                        console.writeline(result);
                    }
                });
            }
            catch (aggregateexception ae)
            {
                // 处理并行处理中的异常
                foreach (var ex in 
                ae.innerexceptions)
                {
                    console.writeline($"error: 
                    {ex.message}");
                }
            }
        }
        private static int
        complexcalculation(int number)
        {
            // 模拟复杂计算
            thread.sleep(100);
            return number * 2;
        }

    }
}

手动分块处理方式

有时我们需要更精细的控制,可以手动将数据分块并分配给不同的线程。

namespace appparallel
{
    internal class program
    {
        static void main(string[] args)
        {
            var largelist = enumerable.range(1, 1000000).tolist();
            processbychunks(largelist, 1000); 
            // 每1000个元素一个块
        }
        public static void processbychunks<t>(list<t> largelist, 
        int chunksize)
        {
            // 计算需要多少个分块
            int chunkscount = (int)math.ceiling((double)largelist.count / chunksize);
            var tasks = new list<task>();

            for (int i = 0; i < chunkscount; i++)
            {
                // 获取当前分块的数据
                var chunk = largelist
                    .skip(i * chunksize)
                    .take(chunksize)
                    .tolist();

                // 创建新任务处理当前分块
                var task = task.run(() => processchunk(chunk));
                tasks.add(task);
            }

            // 等待所有任务完成
            task.waitall(tasks.toarray());
        }

        private static void 
        processchunk<t>(list<t> chunk)
        {
            foreach (var item in chunk)
            {
                // 处理每个元素
                processitem(item);
            }
        }

        private static void 
        processitem<t>(t item)
        {
            // 具体的处理逻辑
            console.writeline
            ($"processing item: {item} on thread: {task.currentid}");
        }

    }
}

使用生产者-消费者模式

对于更复杂的场景,我们可以使用生产者-消费者模式,这样可以更好地控制内存使用和处理流程。

public class producerconsumerexample
{
    private readonly blockingcollection<int> _queue;
    private readonly 
    int _producercount;
    private readonly 
    int _consumercount;
    private readonly 
    cancellationtokensource _cts;

    public producerconsumerexample(int queuecapacity = 1000)
    {
        _queue = new blockingcollection<int>(queuecapacity);
        _producercount = 1;
        _consumercount = 
        environment.processorcount;
        _cts = new cancellationtokensource();
    }

    public async task processdataasync(list<int> largelist)
    {
        // 创建生产者任务
        var producertask = 
        task.run(() => producer(largelist));

        // 创建消费者任务
        var consumertasks = enumerable.range(0, _consumercount)
            .select(_ => task.run(() => consumer()))
            .tolist();

        // 等待所有生产者完成
        await producertask;

        // 标记队列已完成
        _queue.completeadding();

        // 等待所有消费者完成
        await task.whenall(consumertasks);
    }

    private void producer(list<int> items)
    {
        try
        {
            foreach (var item in items)
            {
                if (_cts.
                token.iscancellationrequested)
                    break;

                _queue.add(item);
            }
        }
        catch (exception ex)
        {
            console.writeline($"producer error: 
            {ex.message}");
            _cts.cancel();
        }
    }

    private void consumer()
    {
        try
        {
            foreach (var item in _queue.getconsumingenumerable())
            {
                if (_cts.token.iscancellationrequested)
                    break;

                // 处理数据
                processitem(item);
            }
        }
        catch (exception ex)
        {
            console.writeline($"consumer error: {ex.message}");
            _cts.cancel();
        }
    }

    private void processitem(int item)
    {
        // 具体的处理逻辑
        thread.sleep(100);
        // 模拟耗时操作
        console.writeline($"processed item {item} on thread {task.currentid}");
    }
}

// 使用示例
static async task main(string[] args)
{
    var processor = new producerconsumerexample();
    var largelist = enumerable.range(1, 10000).tolist();
    await processor.processdataasync(largelist);
}

注意事项

1、合适的分块大小:分块不宜过小,因为过多的线程切换会抵消并行处理的优势;也不宜过大,以免影响负载均衡。建议从每块1000到5000个元素开始测试,找到最优的分块大小。

2、异常处理:务必妥善处理并行处理中的异常情况。每个任务应使用try-catch语句包装,确保异常不会导致整个程序崩溃。同时,考虑使用cancellationtoken来优雅地终止所有任务。

3、资源管理:注意内存使用,避免一次性加载过多数据。合理控制并发线程的数量,通常不超过处理器核心数的两倍。对于实现了idisposable接口的资源,使用using语句进行管理,确保资源及时释放。

4、线程安全:访问共享资源时必须保证线程安全,可以使用适当的同步机制如锁(lock)、信号量(semaphore)等。考虑使用线程安全的集合类,例如concurrentdictionary或concurrentqueue。避免过度锁定,以免造成性能瓶颈。

通过遵循这些注意事项,可以确保在c#中高效且安全地进行大数据列表的并行处理。

总结

并行处理大数据列表是提升程序性能的有效手段,但需根据具体场景选择合适的实现方式。

本文介绍了三种主要方法,各有其适用场景和优势:

parallel.foreach:适用于简单场景,易于实现且代码简洁。适合快速原型开发或处理逻辑较为直接的任务。

手动分块处理:提供更精细的控制,适合中等复杂度场景。允许开发者优化分块大小和线程分配,以达到最佳性能。

生产者-消费者模式:适用于复杂场景,能够更好地管理资源使用和任务调度。特别适合需要高效处理大量数据流或涉及多个处理阶段的应用。

在实际应用中,建议首先进行性能测试,根据数据量大小、处理复杂度以及系统的硬件配置选择最合适的实现方式。

另外,务必重视异常处理和资源管理,确保程序的稳定性和可靠性。通过合理的并行处理策略,可以显著提高大型数据集的处理效率,为应用程序带来更好的用户体验。

以上就是c#中高效的多线程并行处理实现方式详解的详细内容,更多关于c#多线程并行处理的资料请关注代码网其它相关文章!

(0)
打赏 微信扫一扫 微信扫一扫

您想发表意见!!点此发布评论

推荐阅读

C# 配置文件app.config 和 web.config详解

04-24

使用C#进行音频处理的完整指南(从播放到编辑)

04-24

C# 多线程并发编程基础小结

04-24

C# WinForms存储过程操作数据库的实例讲解

04-24

C# 串口扫描枪读取数据的实现

04-24

C#和Java互相调用的方法小结

04-24

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论