自学内容网 自学内容网

ZooKeeper实现分布式锁

1、基于ZooKeeper基本API实现

pom.xml
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DistributeLock {
    private final String connectString = "127.0.0.1:2181";
    private final int sessionTimeout = 2000;
    private final ZooKeeper zk;

    private CountDownLatch connectLatch = new CountDownLatch(1);
    private CountDownLatch waitLatch = new CountDownLatch(1);

    private String waitPath;
    private String currentNode;

    public DistributeLock() throws IOException, InterruptedException, KeeperException {
        //获取连接
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                //connectLatch 如果连接上zk,释放
                if(event.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }

                // waitLatch 前一个节点释放锁删除后,释放
                if(event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });

        //等待zk正常连接后,再往下执行程序
        connectLatch.await();

        // 判断根节点(/locks)是否存在
        Stat stat = zk.exists("/locks", false);
        if (null == stat) {
            //创建根节点
            zk.create("/locks", "locking".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    //加锁
    public void zkLock() {
        //创建临时节点
        try {
            currentNode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            //判断创建的节点是否是最小的序号节点,如果是则获取到锁,如果不是则监听它序号前一个节点
            List<String> children = zk.getChildren("/locks", false);
            //如果children只有一个值,那就直接获取锁;如果有多个节点,需要判断,谁最小
            if(children.size() == 1) {
                return;
            } else {
                Collections.sort(children);

                //获得节点名称 seq-00000000
                String thisNode = currentNode.substring("/locks/".length());
                int index = children.indexOf(thisNode);

                if(index == -1) {
                    System.out.println("数据异常");
                } else if (index == 0) {
                    //就一个节点,直接获取锁
                    return;
                } else {
                    //需要监听它前一个节点变化
                    waitPath = "/locks/" + children.get(index - 1);
                    zk.getData(waitPath, true, null);
                    //等待监听
                    waitLatch.await();
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //解锁
    public void unZkLock() {
        try {
            zk.delete(currentNode, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
}
测试:
import org.apache.zookeeper.KeeperException;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class DistributeLockTest {
    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
        final DistributeLock lock1 = new DistributeLock();
        final DistributeLock lock2 = new DistributeLock();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.zkLock();
                    System.out.println(Thread.currentThread().getName()+"==>获得锁");

                    TimeUnit.SECONDS.sleep(10);

                    lock1.unZkLock();
                    System.out.println(Thread.currentThread().getName()+"==>释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "线程1").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.zkLock();
                    System.out.println(Thread.currentThread().getName()+"==>获得锁");

                    TimeUnit.SECONDS.sleep(10);

                    lock2.unZkLock();
                    System.out.println(Thread.currentThread().getName()+"==>释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "线程2").start();
    }
}

2、基于框架curator实现

pom.xml
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;

public class CuratorLock {
    public static void main(String[] args) {
        //创建分布式锁1
        final InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

        //创建分布式锁2
        final InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.acquire();
                    System.out.println(Thread.currentThread().getName()+"==>获得锁");

                    lock1.acquire();
                    System.out.println(Thread.currentThread().getName()+"==>再次获得锁");

                    TimeUnit.SECONDS.sleep(10);

                    lock1.release();
                    System.out.println(Thread.currentThread().getName()+"==>释放锁");

                    lock1.release();
                    System.out.println(Thread.currentThread().getName()+"==>再次释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "线程1").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.acquire();
                    System.out.println(Thread.currentThread().getName()+"==>获得锁");

                    lock2.acquire();
                    System.out.println(Thread.currentThread().getName()+"==>再次获得锁");

                    TimeUnit.SECONDS.sleep(8);

                    lock2.release();
                    System.out.println(Thread.currentThread().getName()+"==>释放锁");

                    lock2.release();
                    System.out.println(Thread.currentThread().getName()+"==>再次释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "线程2").start();

    }

    private static CuratorFramework getCuratorFramework() {
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 3);

        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .connectionTimeoutMs(2000)
                .sessionTimeoutMs(2000)
                .retryPolicy(retryPolicy).build();

        //启动客户端
        client.start();

        System.out.println("zookeeper启动成功");

        return client;

    }
}


原文地址:https://blog.csdn.net/java_key_code/article/details/140422623

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!