并发处理笔记使用并发工具类库,就安全了吗?

编程

没有意识到线程重用导致用户信息错乱的bug

我们知道,ThreadLocal适用于变量在线程间隔离,而在方法和类间共享的场景. 如果信息获取比较昂贵(比如从数据库中查询用户信息),那么在ThreadLocal中缓存数据是比较合适的做法.那什么时候会出现用户信息错乱的bug呢?

来看一个例子,使用spring boot创建一个web应用程序,使用threadlocal存放一个integer的值表示用户id,最后输出两次获取的值和线程名称.

private ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);

@GetMapping("wrong")

pubilc Map wrong(@RequestParam("userId") Integer userId) {

String before = Thread.currentThread().getName() + ":" + currentUser.get();

currentUser.set(userId);

String after = Thread.currentThread().getName() + ":" + currentUser.get();

Map result = new HashMap();

result.put("before", before);

result.put("after", after);

return result;

}

正常来说,在设置用户信息之前获取的值(即before)应该始终是null,但我们要意识到,程序是运行在Tomcat中,执行线程是在Tomcat的工作现场,而Tomcat的工作线程是基于线程池的. 顾名思义,线程池会重用几个固定的线程,一旦线程重用,那么很可能首次从ThreadLocal获取的是之前其他用户的请求遗留的值.这时,ThreadLocal中的用户信息就是其他用户的信息了。

这个例子告诉我们,在写业务代码时,首先要理解代码会跑在什么线程上.

  • 可能我们会因为没有显示的使用多线程也觉得学习多线程没用,但我们并不能认为没有显示开启多线程就不会用现成问题.(如上例子,在tomcat跑单线程处理代码)
  • 使用类似ThreadLocal工具存在一些数据时,需特别注意在代码运行完后,显式的去清空设置的数据。如果代码中使用了自定义线程池也需同样注意。

理解了这个知识点后,修改代码就变得很简单了,只需要在代码的finall代码块中,显式清除ThreadLocal中的数据。

private ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);

@GetMapping("wrong")

pubilc Map wrong(@RequestParam("userId") Integer userId) {

String before = Thread.currentThread().getName() + ":" + currentUser.get();

currentUser.set(userId);

try{

String after = Thread.currentThread().getName() + ":" + currentUser.get();

Map result = new HashMap();

result.put("before", before);

result.put("after", after);

return result;

} finally {

//在finally代码块中删除ThreadLocal中的数据,确保数据不串

currentUser.remove();

}

使用了线程安全的并发工具并不代表解决了所有线程安全问题

ConcurrentHashMap只能保证提供的原子性读写操作是安全的。

我们来看一个例子:

//线程个数

private static int THREAD_COUNT = 10;

//总元素数量

private static int ITEM_COUNT = 1000;

//帮助方法,用来获得一个指定元素数量模拟数据的ConcurrentHashMap

private ConcurrentHashMap<String, Long> getData(int count) {

return LongStream.rangeClosed(1, count)

.boxed()

.collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(), Function.identity(),

(o1, o2) -> o1, ConcurrentHashMap::new));

}

@GetMapping("wrong")

public String wrong() throws InterruptedException {

ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);

//初始900个元素

log.info("init size:{}", concurrentHashMap.size());

ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);

//使用线程池并发处理逻辑

forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {

//查询还需要补充多少个元素

int gap = ITEM_COUNT - concurrentHashMap.size();

log.info("gap size:{}", gap);

//补充元素

concurrentHashMap.putAll(getData(gap));

}));

//等待所有任务完成

forkJoinPool.shutdown();

forkJoinPool.awaitTermination(1, TimeUnit.HOURS);

//最后元素个数会是1000吗?

log.info("finish size:{}", concurrentHashMap.size());

return "OK";

}

这段代码中,concurrentHashMap.size() 和 concurrentHashMap.putAll() (注意putAll本身也不是线程安全的方法,因为其存在复制的操作)是两个操作,工具类ConcurrentHashMap并不保证多个操作之间的状态一致,所以需要加锁才能保证线程安全,

//使用线程池并发处理逻辑

forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {

synchronized(concurrentHashMap){

//查询还需要补充多少个元素

int gap = ITEM_COUNT - concurrentHashMap.size();

log.info("gap size:{}", gap);

//补充元素

concurrentHashMap.putAll(getData(gap));

}

}));

可能有人会觉得全程加锁,那还不如使用普通的HashMap呢,其实,ConcurrentHashMap也有一些原子性的简单复合逻辑方法,例如computeIfAbsent:用来判断key是否存在value,如果不存在则把Lambda表达式运行后的结果放入Map作为Value,也就是新创建一个LongAdder对象,最后返回Value,由于LongAdder是一个线程安全的累加器,一次可以直接调用increment方法进行累加。

private Map<String, Long> gooduse() throws InterruptedException {

ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT);

ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);

forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {

String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);

//利用computeIfAbsent()方法来实例化LongAdder,然后利用LongAdder来进行线程安全计数

freqs.computeIfAbsent(key, k -> new LongAdder()).increment();

}

));

forkJoinPool.shutdown();

forkJoinPool.awaitTermination(1, TimeUnit.HOURS);

//因为我们的Value是LongAdder而不是Long,所以需要做一次转换才能返回

return freqs.entrySet().stream()

.collect(Collectors.toMap(

e -> e.getKey(),

e -> e.getValue().longValue())

);

}

computeIfAbsent为什么如此高效呢

答案就在源码最核心的部分,也就是Java自带的Unsafe实现的CAS。它在虚拟机层面确保了写入数据的原子性,比加锁的效率高的多:

static final <K,V> boolean casTabAt(Node<K,V> tab, int i, Node<K,V> c, Node<K,V> v) {

return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);

}

没有了解清楚工具的适用场景

比如没有理解CopyOnWriteArrayList的适用场景,把它用在了读写均衡或者大量写操作的场景下,导致性能问题,对于这种场景,你可以考虑使用普通的List。

CopytOnWriteArrayList虽然是一个线程安全的ArrayList,但因为其实现方式是,每次修改数据时都会复制一份数据处理,所以有明显的适用场景,即读多写少或者希望无锁读的场景。

以上是 并发处理笔记使用并发工具类库,就安全了吗? 的全部内容, 来源链接: utcz.com/z/514874.html

回到顶部