8人参与 • 2025-07-24 • Asp.net
概念 | 特点 | 适用场景 |
---|---|---|
并行 | 同时执行多个任务(多核) | cpu 密集型计算 |
并发 | 交替执行多个任务(单核伪并行) | i/o 阻塞型任务 |
异步 | 非阻塞执行任务 | 网络/文件操作 |
// 矩阵乘法加速(使用 simd 指令) void multiplymatrices(float[,] mata, float[,] matb, float[,] result) { int size = mata.getlength(0); // 使用硬并行度 (物理核心数) var paralleloptions = new paralleloptions { maxdegreeofparallelism = environment.processorcount }; parallel.for(0, size, paralleloptions, i => { for (int j = 0; j < size; j++) { vector<float> sum = vector<float>.zero; for (int k = 0; k < size; k += vector<float>.count) { vector<float> avec = new vector<float>(mata, i, k); vector<float> bvec = new vector<float>(matb, k, j); sum += avec * bvec; // simd 并行计算 } result[i, j] = vector.dot(sum, vector<float>.one); } }); }
// 多源数据聚合计算 async task processmultisourceasync() { var source1task = fetchdatafromapi("https://source1.com"); var source2task = loaddatabasedataasync("datasource=..."); var source3task = readlocalfileasync("data.json"); // 并行等待所有任务 await task.whenall(source1task, source2task, source3task); // 安全合并结果(避免锁机制) var results = new [] { source1task.result, source2task.result, source3task.result }; var finalresult = combinedata(results); }
// 高性能并发通道 async task runconcurrentpipelineasync() { // 优化选项:减少内存分配 var options = new unboundedchanneloptions { allowsynchronouscontinuations = false, singlereader = false, // 支持多消费者 singlewriter = false // 支持多生产者 }; var channel = channel.createunbounded<dataitem>(options); var producertasks = new list<task>(); // 启动 3 个生产者 for (int i = 0; i < 3; i++) { producertasks.add(produceitemsasync(channel.writer)); } // 启动 4 个消费者 var consumertasks = enumerable.range(1, 4) .select(_ => consumeitemsasync(channel.reader)) .toarray(); // 等待生产完成 await task.whenall(producertasks); channel.writer.complete(); // 等待消费完成 await task.whenall(consumertasks); }
// 分页数据的并发批处理 (.net 6+) async task batchprocessasync(ienumerable<int> allitems) { // 使用 parallel.foreachasync 限流 await parallel.foreachasync( source: allitems, paralleloptions: new paralleloptions { maxdegreeofparallelism = 10, // 限制并发度 cancellationtoken = _cts.token }, async (item, ct) => { await using var semaphore = new semaphoreslimdisposable(5); // 细粒度控制 await semaphore.waitasync(ct); try { await processitemasync(item, ct); } finally { semaphore.release(); } }); } // 自动释放的信号量包装器 struct semaphoreslimdisposable : iasyncdisposable { private readonly semaphoreslim _semaphore; public semaphoreslimdisposable(int count) => _semaphore = new semaphoreslim(count); public valuetask waitasync(cancellationtoken ct) => _semaphore.waitasync(ct).asvaluetask(); public void release() => _semaphore.release(); public valuetask disposeasync() => _semaphore.disposeasync(); }
// 避免伪共享(false sharing) class falsesharingsolution { [structlayout(layoutkind.explicit, size = 128)] struct paddedcounter { [fieldoffset(64)] // 每个计数器独占缓存行 public long counter; } private readonly paddedcounter[] _counters = new paddedcounter[4]; public void increment(int index) => interlocked.increment(ref _counters[index].counter); }
// 为高优先级任务创建专用线程池 static taskfactory highprioritytaskfactory { get { var threadcount = environment.processorcount / 2; var threads = new thread[threadcount]; for (int i = 0; i < threadcount; i++) { var t = new thread(() => thread.currentthread.priority = threadpriority.highest) { isbackground = true, priority = threadpriority.highest }; threads[i] = t; } var taskscheduler = new concurrentexclusiveschedulerpair( taskscheduler.default, threadcount).concurrentscheduler; return new taskfactory(cancellationtoken.none, taskcreationoptions.denychildattach, taskcontinuationoptions.none, taskscheduler); } } // 使用示例 highprioritytaskfactory.startnew(() => executecriticaltask());
反模式 | 性能影响 | 优化方案 |
---|---|---|
过度并行化 | 线程上下文切换开销 | 设置 maxdegreeofparallelism |
共享状态竞争 | 缓存行伪共享 | 使用填充结构或局部变量 |
忽视 task.run 开销 | 线程池调度延迟 | 直接执行短任务 |
blockingcollection 滥用 | 并发阻塞性能下降 | 改用 channel<t> |
忘记 cancellationtoken | 僵尸任务消耗资源 | 在所有任务中传递 cancellationtoken |
方法 | 耗时 (ms) | 加速比 |
---|---|---|
单线程循环 | 52,800 | 1.0× |
parallel.foreach | 14,600 | 3.6× |
simd+parallel | 4,230 | 12.5× |
方案 | qps | cpu使用率 |
---|---|---|
同步阻塞 | 42,000 | 100% |
原生 task | 210,000 | 78% |
通道+限流 | 480,000 | 65% |
// 使用 concurrencyvisualizer async task trackparallelism() { using (var listener = new concurrencyvisualizertelemetry()) { // 标记并行区域 listener.beginoperation("parallel_core"); await processbatchparallelasync(); listener.endoperation("parallel_core"); // 标记串行区域 listener.beginoperation("sync_operation"); runsynccalculation(); listener.endoperation("sync_operation"); } }
# 查看线程池使用情况 dotnet-counters monitor -p pid system.threading.threadpool # 检测锁竞争 dotnet-dump collect --type hang -p pid
并行选择策略
黄金规则
≤ environment.processorcount
channel<t>
避免阻塞interlocked
而非 lock
iasyncdisposable
高级策略
parallel.foreachasync
处理混合负载system.numerics.tensors
nativeaot
减少并行延迟实测成果:
使用上述技术后,某金融数据分析系统:
- 结算时间从 47 分钟压缩至 3.2 分钟
- 单节点吞吐量提升 8.6 倍
- cpu 利用率稳定在 85%-95%
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论