ZooKeeper实现分布式锁
1、基于ZooKeeper基本API实现
pom.xml
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class DistributeLock {
private final String connectString = "127.0.0.1:2181";
private final int sessionTimeout = 2000;
private final ZooKeeper zk;
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
private String waitPath;
private String currentNode;
public DistributeLock() throws IOException, InterruptedException, KeeperException {
//获取连接
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
//connectLatch 如果连接上zk,释放
if(event.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// waitLatch 前一个节点释放锁删除后,释放
if(event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
});
//等待zk正常连接后,再往下执行程序
connectLatch.await();
// 判断根节点(/locks)是否存在
Stat stat = zk.exists("/locks", false);
if (null == stat) {
//创建根节点
zk.create("/locks", "locking".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
//加锁
public void zkLock() {
//创建临时节点
try {
currentNode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//判断创建的节点是否是最小的序号节点,如果是则获取到锁,如果不是则监听它序号前一个节点
List<String> children = zk.getChildren("/locks", false);
//如果children只有一个值,那就直接获取锁;如果有多个节点,需要判断,谁最小
if(children.size() == 1) {
return;
} else {
Collections.sort(children);
//获得节点名称 seq-00000000
String thisNode = currentNode.substring("/locks/".length());
int index = children.indexOf(thisNode);
if(index == -1) {
System.out.println("数据异常");
} else if (index == 0) {
//就一个节点,直接获取锁
return;
} else {
//需要监听它前一个节点变化
waitPath = "/locks/" + children.get(index - 1);
zk.getData(waitPath, true, null);
//等待监听
waitLatch.await();
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//解锁
public void unZkLock() {
try {
zk.delete(currentNode, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
测试:
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class DistributeLockTest {
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
final DistributeLock lock1 = new DistributeLock();
final DistributeLock lock2 = new DistributeLock();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.zkLock();
System.out.println(Thread.currentThread().getName()+"==>获得锁");
TimeUnit.SECONDS.sleep(10);
lock1.unZkLock();
System.out.println(Thread.currentThread().getName()+"==>释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}, "线程1").start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.zkLock();
System.out.println(Thread.currentThread().getName()+"==>获得锁");
TimeUnit.SECONDS.sleep(10);
lock2.unZkLock();
System.out.println(Thread.currentThread().getName()+"==>释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}, "线程2").start();
}
}
2、基于框架curator实现
pom.xml
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
public class CuratorLock {
public static void main(String[] args) {
//创建分布式锁1
final InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
//创建分布式锁2
final InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.acquire();
System.out.println(Thread.currentThread().getName()+"==>获得锁");
lock1.acquire();
System.out.println(Thread.currentThread().getName()+"==>再次获得锁");
TimeUnit.SECONDS.sleep(10);
lock1.release();
System.out.println(Thread.currentThread().getName()+"==>释放锁");
lock1.release();
System.out.println(Thread.currentThread().getName()+"==>再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}, "线程1").start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.acquire();
System.out.println(Thread.currentThread().getName()+"==>获得锁");
lock2.acquire();
System.out.println(Thread.currentThread().getName()+"==>再次获得锁");
TimeUnit.SECONDS.sleep(8);
lock2.release();
System.out.println(Thread.currentThread().getName()+"==>释放锁");
lock2.release();
System.out.println(Thread.currentThread().getName()+"==>再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}, "线程2").start();
}
private static CuratorFramework getCuratorFramework() {
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.connectionTimeoutMs(2000)
.sessionTimeoutMs(2000)
.retryPolicy(retryPolicy).build();
//启动客户端
client.start();
System.out.println("zookeeper启动成功");
return client;
}
}
原文地址:https://blog.csdn.net/java_key_code/article/details/140422623
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!