# 多线程并发解决方案:单线程执行解决复杂的并发场景
## 背景:如何对复杂的业务环境加锁?
在JAVA体系中,多线程是一个比较重要的模块,同时也是一个饱受争议的模块。
一方面,合理利用多线程确实能提高计算性能,确保程序合理运行。但并非所有人都能深入理解多线程,可能不恰当的使用反而导致业务复杂,难以维护,bug滋生,这在业务复杂的环境下尤为明显。
以SLG游戏《战火与秩序》为例:
![多线程并发解决方案:单线程执行](https://oss.yeas.fun/halo-yeas/singleton-module1.jpg)
假设有1个城池,它有如下业务逻辑:
- 对外出兵,则从当前城池中扣减兵力,增加出发部队的兵力
- 被外部城池攻打,则战斗结算时,需要扣除自身城池损耗兵力和攻击部队的损失兵力
- 盟军增援,则增援部队到达时,需要实时增援到战场中,并在战斗结束后参与结算
当然实际的游戏逻辑远比上述复杂的多,一个城池上可能同时发生的事很多,那如何保证逻辑不出错?
直观的想法就是加锁,但在一个城池上不只是本城池的兵力在变化,还涉及到其他城池的攻打部队、增援部队等。如果直接加锁,一方面可能需要对多个对象同时加锁,另一方面还会影响效率。
## 一种解决思路:整游戏单线程执行
在JAVA中,如果想要保证线程安全,就必须是同一时刻仅允许一个线程执行逻辑,不管是加锁也好,放入队列慢慢消费也罢,最终都还是单线程执行。
因此像游戏这种复杂逻辑的环境,有的框架在设计的时候就会牺牲一部分性能,从而降低业务复杂度。比如说:整个游戏就是单线程执行。
游戏毕竟不是像淘宝那种超高并发的业务,有时这种以性能换简洁的做法也是可以的。目前市面上不少游戏基本就是纯单线程执行。
整个游戏单线程执行,其实现有利有弊,这不是我们今天讨论的重点。我们需要考虑的是,在多线程的架构下,是否也能够像单线程一样编写业务逻辑而不出错?
## 另一种思路:将多线程业务转为单线程
就上述SLG的例子,因为业务都是在城池上触发的,那假设我们可以把所有的触发事件都排队放到一个单线程里面依次执行,不管是对外出兵、被攻打还是盟军增援,只要是涉及到该城池的事件,
都是按次序一个个执行,那就可以避免加锁,且逻辑也比较容易控制。于是上面的问题就转化为怎么将多线程转为单线程执行?
### 大概的思路
1. 将多线程转为单线程,我们要把不同的任务分发到不同队列(保证同名的任务一定是分发到同一个队列)。
2. 其次,队列可能设几十个,如果为每个队列单独创建一个线程消耗太大,最好是后台由一个线程池去消费队列(需控制线程池同一时刻仅有一个线程消费同一个队列)。
3. 第三,如果我们想要同步获取到异步执行的结果,这里就需要支持Future方式返回结果
**整体方案如下:**
![多线程并发解决方案:单线程执行](https://oss.yeas.fun/halo-yeas/singleton-module2.png)
### 核心点1:
关于点1,我们需要把所有任务分配到不同的队列中,其中如果同名的,一定是映射到同一个队列上。这个需求就是基本的hash映射。
于是我们可以基于任务的名字,计算hash,然后拿hash与队列数量-1取模,即可得到 0到SIZE-1 之间的一个数,这就是队列的下标。
```java
public class SingletonModule{
private <T> FutureWrapper<T> addTask(String key, Callable<T> callable) {
// 基于任务的名字,计算hash
SingletonTask<T> task = new SingletonTask<>(key, callable);
int rehash = rehash(task.getHash().hashCode());
// hash与队列数量-1按位与运算,即可得到 0到SIZE-1 之间的一个数
int idx = rehash & (SIZE - 1);
// 根据idx一定可以获取到一个队列
SingletonTaskQueue queue = queues[idx];
// 增加任务到对应的队列
queue.addTask(task);
// 每个事件,都有对应的future
return new FutureWrapper<T>(task.getFuture());
}
}
```
### 核心点2:
点2就是要控制线程池消费队列,但同一时刻仅允许一个线程消费同一个线程,这个状态是由isDealing变量控制。 SingletonTaskQueue就是线程池消费队列的线程执行逻辑。
- 在向Queue添加任务时,如果当前队列不在被其他线程消费,则往线程池中提交消费任务
- 线程的run()方法就是执行的execute()。其中有一个while循环,持续从队列获取任务进行消费,一直把当前队列消费空。最后finally会把isDealing状态设为false,代表当前队列没有线程正在消费。
- 要注意finally的最后一段:有可能在 isDealing=true的时候,又往队列中新增了任务,所以需要再次检测队列是否为空,不为空则要继续消费。否则可能出现最后一个任务碰巧无法执行的情况。
```java
public class SingletonTaskQueue implements Runnable {
public void addTask(SingletonTask task) {
boolean success = queue.offer(task);
// 添加队列失败
if (!success) {
task.onAddQueueFailed();
return;
}
// 如果当前队列不在被其他线程消费,则往线程池中提交消费任务
if (executable()) {
ThreadPoolService.getInstance().runTask(this, threadPoolName);
}
}
@Override
public void run() {
execute();
}
private void execute() {
try {
while (true) {
try {
SingletonTask task = this.queue.poll();
// 没有对象,则结束循环
if (task == null) {
// 并发问题:如果有断点在这里
// 另一个线程把event放入队列中,且因为没有获取到锁,则快速失败,当前线程也break了,则有event无法消耗
// 所以:在finally段队列二次检查
break;
}
if (LOG.isWarnEnabled()) {
LOG.warn("Singleton task[{}] triggered", task.getHash());
}
ListenableFutureTask future = task.getFuture();
// 同步执行
future.run();
} catch (Exception e) {
LOG.error("SingletonTaskQueue[{}] error", SingletonTaskQueue.this.idx, e);
}
}
} finally {
// 最后一定要把标识位修改为false
this.isDealing.set(false);
// 因为上面已经已经把状态isDealing重置了,如果队列里有对象,则继续放到线程池执行
if (!this.queue.isEmpty() && executable()) {
execute();
}
}
}
}
```
### 核心点3:
可能有业务中需要同步或异步获取单线程执行后的结果,这是典型的Future应用场景。所以在Task外部封装了一个future,这样task被执行后,通过Future就可以获取到结果。
```java
public class SingleonTask{
public SingletonTask(String hash, Callable<T> callable) {
this.hash = hash;
this.future = ListenableFutureTask.create(() -> {
try {
return new FutureResult<>(callable.call());
} catch (Exception e) {
// 非错误码异常:则异常上报
log.error("SingletonTask error", e);
return FutureResult.newFutureResultWithException(e);
}
});
}
}
```
### 示例代码github:
[https://github.com/cm4j/cm4j-all](https://github.com/cm4j/cm4j-all)
#### 单元测试代码
NormalSingletonModuleTest:运行可看到,我们提交了3个任务,其中2个同名的任务会单线程执行,属于同一个线程执行。而另一个不同名的任务是由单独的一个线程执行。
## 总结
通过将多线程转变为单线程执行,我们在业务中可以不进行加锁也可以控制业务逻辑高并发执行,这是应对复杂业务的一种解决思路。
## --- END ---
多线程并发解决方案:单线程执行解决复杂的并发场景