7.4 读写锁原理
ReentrantReadWriteLock
当读操作远远高于写操作时,这时候使用读写锁
让读读
可以并发,提高性能。类似于数据库中select ... from ... lock in share mode
。
提供一个数据容器类
内部分别使用读锁保护数据的read()
,写锁保护数据的write()
方法。
@Slf4j(topic = "c.DataContainer")
class DataContainer {
private Object data;
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock r = rw.readLock();
private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
public Object read() {
log.debug("获取读锁...");
r.lock();
try {
log.debug("读取");
Thread.sleep(1000);
return data;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
log.debug("释放读锁...");
r.unlock();
}
}
public void write() {
log.debug("获取写锁...");
w.lock();
try {
log.debug("写入");
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
log.debug("释放写锁...");
w.unlock();
}
}
}
测试读锁-读锁
可以并发
public class TestReentrantReadWriteLock {
public static void main(String[] args) {
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.read();
}, "t2").start();
}
}
输出结果,从这里可以看到 Thread-0 锁定期间,Thread-1 的读操作不受影响
17:41:10.773 [t2] - 获取读锁...
17:41:10.773 [t1] - 获取读锁...
17:41:10.774 [t2] - 读取
17:41:10.774 [t1] - 读取
17:41:11.776 [t1] - 释放读锁...
17:41:11.779 [t2] - 释放读锁...
测试读锁-写锁
相互阻塞
public class TestReentrantReadWriteLock {
public static void main(String[] args) throws InterruptedException {
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
Thread.sleep(100);
new Thread(() -> {
dataContainer.write();
}, "t2").start();
}
}
输出结果
17:50:57.280 [t1] - 获取读锁...
17:50:57.281 [t1] - 读取
17:50:57.384 [t2] - 获取写锁...
17:50:58.287 [t1] - 释放读锁...
17:50:58.287 [t2] - 写入
17:50:59.291 [t2] - 释放写锁...
写锁-写锁
也是相互阻塞的,这里就不测试了
注意事项
- 读锁不支持条件变量
- 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
r.lock();
try {
// ...
w.lock();
try {
// ...
} finally {
w.unlock();
}
} finally {
r.unlock();
}
- 重入时降级支持:即持有写锁的情况下去获取读锁
class CachedData {
Object data;
// 是否有效,如果失效,需要重新计算 data
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// 获取写锁前必须释放读锁
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock();
}
}
// 自己用完数据, 释放读锁
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
缓存
缓存更新策略
更新时,是先清缓存还是先更新数据库?
先清缓存
假设有两个线程,线程 B 清空缓存后,其 CPU 时间片到,线程 A 拿到 CPU 控制权,此时线程 A 查询缓存未命中,便去查询数据库,并将查询到的结果放入缓存,此时线程 A 的 CPU 时间片到,线程 B 继续执行,更新数据库。这样就导致线程 A 后续查询到的结果都是旧值。
先更新数据库
假设有两个线程,线程 B 更新数据库,此时 CPU 时间片到,切换至线程 A,线程 A 查询缓存,为旧数据。线程 A 时间片到,切换至线程 B 继续执行,线程 B 在更新完数据库后,接着清空缓存。线程 A 在重新拿到 CPU 控制权后,便会去查询数据库中的数据,并将结果更新至缓存中。这样后续查询到的结果都是新值。
补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询
这种情况出现几率非常小。
读写锁实现一致性缓存
使用读写锁实现一个简单的按需加载缓存
public class GenericDao {
static String URL = "jdbc:mysql://localhost:3306/test";
static String USERNAME = "root";
static String PASSWORD = "root";
public <T> List<T> queryList(Class<T> beanClass, String sql, Object... args) {
System.out.println("sql: [" + sql + "] params:" + Arrays.toString(args));
BeanRowMapper<T> mapper = new BeanRowMapper<>(beanClass);
return queryList(sql, mapper, args);
}
public <T> T queryOne(Class<T> beanClass, String sql, Object... args) {
System.out.println("sql: [" + sql + "] params:" + Arrays.toString(args));
BeanRowMapper<T> mapper = new BeanRowMapper<>(beanClass);
return queryOne(sql, mapper, args);
}
private <T> List<T> queryList(String sql, RowMapper<T> mapper, Object... args) {
try (Connection conn = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
try (PreparedStatement psmt = conn.prepareStatement(sql)) {
if (args != null) {
for (int i = 0; i < args.length; i++) {
psmt.setObject(i + 1, args[i]);
}
}
List<T> list = new ArrayList<>();
try (ResultSet rs = psmt.executeQuery()) {
while (rs.next()) {
T obj = mapper.map(rs);
list.add(obj);
}
}
return list;
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
private <T> T queryOne(String sql, RowMapper<T> mapper, Object... args) {
List<T> list = queryList(sql, mapper, args);
return list.size() == 0 ? null : list.get(0);
}
public int update(String sql, Object... args) {
System.out.println("sql: [" + sql + "] params:" + Arrays.toString(args));
try (Connection conn = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
try (PreparedStatement psmt = conn.prepareStatement(sql)) {
if (args != null) {
for (int i = 0; i < args.length; i++) {
psmt.setObject(i + 1, args[i]);
}
}
return psmt.executeUpdate();
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
interface RowMapper<T> {
T map(ResultSet rs);
}
static class BeanRowMapper<T> implements RowMapper<T> {
private Class<T> beanClass;
private Map<String, PropertyDescriptor> propertyMap = new HashMap<>();
public BeanRowMapper(Class<T> beanClass) {
this.beanClass = beanClass;
try {
BeanInfo beanInfo = Introspector.getBeanInfo(beanClass);
PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
for (PropertyDescriptor pd : propertyDescriptors) {
propertyMap.put(pd.getName().toLowerCase(), pd);
}
} catch (IntrospectionException e) {
throw new RuntimeException(e);
}
}
@Override
public T map(ResultSet rs) {
try {
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
T t = beanClass.newInstance();
for (int i = 1; i <= columnCount; i++) {
String columnLabel = metaData.getColumnLabel(i);
PropertyDescriptor pd = propertyMap.get(columnLabel.toLowerCase());
if (pd != null) {
pd.getWriteMethod().invoke(t, rs.getObject(i));
}
}
return t;
} catch (SQLException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
}
}
public class GenericCachedDao extends GenericDao{
private GenericDao dao = new GenericDao();
private Map<SqlPair, Object> map = new HashMap<>();
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
@Override
public <T> T queryOne(Class<T> beanClass, String sql, Object... args) {
SqlPair key = new SqlPair(sql, args);
rw.readLock().lock();
try {
// 先从缓存中找,找到直接返回
T value = (T) map.get(key);
if(value != null) {
return value;
}
} finally {
rw.readLock().unlock();
}
rw.writeLock().lock();
try {
// 可能有多个线程进入查询
T value = (T) map.get(key);
if(value == null) {
// 缓存中没有,查询数据库
value = dao.queryOne(beanClass, sql, args);
map.put(key, value);
}
return value;
} finally {
rw.writeLock().unlock();
}
}
@Override
public int update(String sql, Object... args) {
rw.writeLock().lock();
try {
int update = dao.update(sql, args);
// 清空缓存
map.clear();
return update;
} finally {
rw.writeLock().unlock();
}
}
}
:::info
注意:
- 以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑:
- 适合读多写少,如果写操作比较频繁,以上实现性能低
- 没有考虑缓存容量
- 没有考虑缓存过期
- 只适合单机
- 并发性还是低,目前只会用一把锁
- 更新方法太过简单粗暴,清空了所有 key(考虑按类型分区或重新设计 key)
- 乐观锁实现:用 CAS 去更新
:::
StampedLock
该类自 JDK8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
加/解读锁
long stamp = lock.readLock();
lock.unlockRead(stamp);
加/解写锁
long stamp = lock.writeLock();
lock.unlockWrite(stamp);
乐观读,StampedLock 支持tryOptimisticRead()
方法(乐观读),读取完毕后需要做一次戳校验
。如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。
long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)) {
// 锁升级
}
提供一个数据容器类,内部分别使用读锁保护数据的read()
方法,写锁保护数据的write()
方法
@Slf4j(topic = "c.DataContainerStamped")
class DataContainerStamped {
private int data;
private final StampedLock lock = new StampedLock();
public DataContainerStamped(int data) {
this.data = data;
}
public int read(int readTime) {
long stamp = lock.tryOptimisticRead();
log.debug("optimistic read locking...{}", stamp);
try {
Thread.sleep(readTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (lock.validate(stamp)) {
log.debug("read finish...{}, data: {}", stamp, data);
return data;
}
// 锁升级 - 读锁
log.debug("updating to read lock...{}", stamp);
try {
stamp = lock.readLock();
log.debug("read lock {}", stamp);
Thread.sleep(readTime);
log.debug("read finish...{}, data: {}", stamp, data);
return data;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
log.debug("read unlock {}", stamp);
lock.unlockRead(stamp);
}
}
public void write(int newData) {
long stamp = lock.writeLock();
log.debug("write lock {}", stamp);
try {
Thread.sleep(2000);
this.data = newData;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
log.debug("write unlock {}", stamp);
lock.unlockWrite(stamp);
}
}
}
测试读 - 读
,可以优化
public class TestStampedLock {
public static void main(String[] args) throws InterruptedException {
DataContainerStamped dataContainerStamped = new DataContainerStamped(1);
new Thread(() -> {
dataContainerStamped.read(1000);
}, "t1").start();
Thread.sleep(500);
new Thread(() -> {
dataContainerStamped.read(0);
}, "t2").start();
}
}
输出结果,可以看到实际没有加读锁
19:51:41.647 [t1] - optimistic read locking...256
19:51:42.152 [t2] - optimistic read locking...256
19:51:42.152 [t2] - read finish...256, data: 1
19:51:42.654 [t1] - read finish...256, data: 1
测试读 - 写
时,优化读补加读锁
public class TestStampedLock {
public static void main(String[] args) throws InterruptedException {
DataContainerStamped dataContainerStamped = new DataContainerStamped(1);
new Thread(() -> {
dataContainerStamped.read(1000);
}, "t1").start();
Thread.sleep(500);
new Thread(() -> {
dataContainerStamped.write(100);
}, "t2").start();
}
}
输出结果
19:54:36.255 [t1] - optimistic read locking...256
19:54:36.759 [t2] - write lock 384
19:54:37.261 [t1] - updating to read lock...256 // 锁升级
19:54:38.765 [t2] - write unlock 384
19:54:38.765 [t1] - read lock 513 // 戳由 256 变为 513
19:54:39.770 [t1] - read finish...513, data: 100
19:54:39.771 [t1] - read unlock 513
:::info
注意:
- StampedLock 不支持条件变量
- StampedLock 不支持可重入
:::
原文地址:https://blog.csdn.net/dingd1234/article/details/140402901
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!