亲宝软件园·资讯

展开

zookeeper实现分布式锁总结,看这一篇足矣(设计模式应用实战)

引路的风 人气:0

zk实现分布式锁纵观网络各种各样的帖子层出不穷,笔者查阅很多资料发现一个问题,有些文章只写原理并没有具体实现,有些文章虽然写了实现但是并不全面

借这个周末给大家做一个总结,代码拿来就可以用并且每一种实现都经过了测试没有bug。下面我们先从最简单的实现开始介绍:

  • 简单的实现
package com.srr.lock;

/**
 * @Description 分布式锁的接口
 */
abstract  public interface DistributedLock {
    /**
     * 获取锁
     */
    boolean lock();
    /**
     * 解锁
     */
    void unlock();

    abstract boolean readLock();
    abstract boolean writeLock();
}

package com.srr.lock;

/**
 * 简单的zk分布式做实现策略
 * 性能比较低会导致羊群效应
 */
public abstract class SimplerZKLockStrategy implements DistributedLock{
    /**
     * 模板方法,搭建的获取锁的框架,具体逻辑交于子类实现
     * @throws Exception
     */
    @Override
    public boolean lock() {
        //获取锁成功
        if (tryLock()){
            System.out.println(Thread.currentThread().getName()+"获取锁成功");
            return true;
        }else{  //获取锁失败
            //阻塞一直等待
            waitLock();
            //递归,再次获取锁
            return lock();
        }
    }

    /**
     * 尝试获取锁,子类实现
     */
    protected abstract boolean tryLock() ;
    /**
     * 等待获取锁,子类实现
     */
    protected abstract void waitLock();
    /**
     * 解锁:删除key
     */
    @Override
    public  abstract void unlock();
}

package com.srr.lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.concurrent.CountDownLatch;

/**
 * 分布式锁简单实现
 */
public class SimpleZKLock extends SimplerZKLockStrategy{
    private static final String PATH = "/lowPerformance_zklock";
    private CountDownLatch countDownLatch = null;
    //zk地址和端口
    public static final String ZK_ADDR = "192.168.32.129:2181";
    //创建zk
    protected ZkClient zkClient = new ZkClient(ZK_ADDR);

    @Override
    protected boolean tryLock() {
        //如果不存在这个节点,则创建持久节点
        try{
            zkClient.createEphemeral(PATH, "lock");
            return true;
        }catch (Exception e){
            return false;
        }
    }

    @Override
    protected void waitLock() {
        IZkDataListener lIZkDataListener = new IZkDataListener() {

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                if (null != countDownLatch){
                    countDownLatch.countDown();
                }
                System.out.println("listen lock unlock");
            }

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        };
        //监听前一个节点的变化
        zkClient.subscribeDataChanges(PATH, lIZkDataListener);
        if (zkClient.exists(PATH)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        zkClient.unsubscribeDataChanges(PATH, lIZkDataListener);

    }

    @Override
    public void unlock() {
        if (null != zkClient) {
            System.out.println("lock unclock");
            zkClient.delete(PATH);
        }
    }

    @Override
    public boolean readLock() {
        return true;
    }

    @Override
    public boolean writeLock() {
        return true;
    }
}

package com.srr.lock;

import redis.clients.jedis.Jedis;

import java.util.concurrent.CountDownLatch;

/**
 *  测试场景
 *  count从1加到4
 *  使用简单的分布式锁在分布式环境下保证结果正确
 */
public class T {

    volatile int  count = 1;

    public void inc(){
        for(int i = 0;i<3;i++){
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            count++;
            System.out.println("count == "+count);
        }
    }

    public int getCount(){
       return count;
    }

    public static void main(String[] args) throws InterruptedException {
        final T t = new T();
        final Lock lock = new Lock();
        final CountDownLatch countDownLatch = new CountDownLatch(5);
        for(int i = 0;i<5;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    DistributedLock distributedLock = new SimpleZKLock();
                    if(lock.lock(distributedLock)){
                        t.inc();
                        lock.unlock(distributedLock);
                        countDownLatch.countDown();
                    }
                    System.out.println("count == "+t.getCount());
                }
            }).start();
        }
        countDownLatch.await();
    }
}

运行结果:

 

 这种方式实现虽然简单,但是会引发羊群效应,因为每个等待锁的客户端都需要注册监听lock节点的删除事件,如果客户端并发请求很多,那么这将会非常消耗zookeeper集群

的资源,严重的化则会导致zookeeper集群宕机也不是没有可能。

  • 高性能实现,解决羊群效应问题
package com.srr.lock;

/**
 * @Description 分布式锁的接口
 */
abstract  public interface DistributedLock {
    /**
     * 获取锁
     */
    boolean lock();
    /**
     * 解锁
     */
    void unlock();

    abstract boolean readLock();
    abstract boolean writeLock();
}

package com.srr.lock;

public abstract class BlockingZKLockStrategy implements DistributedLock{
    /**
     * 模板方法,搭建的获取锁的框架,具体逻辑交于子类实现
     * @throws Exception
     */
    @Override
    public final boolean lock() {
        //获取锁成功
        if (tryLock()){
            System.out.println(Thread.currentThread().getName()+"获取锁成功");
            return true;
        }else{  //获取锁失败
            //阻塞一直等待
            waitLock();
            //递归,再次获取锁
            return true;
        }
    }

    /**
     * 尝试获取锁,子类实现
     */
    protected abstract boolean tryLock() ;
    /**
     * 等待获取锁,子类实现
     */
    protected abstract void waitLock();
    /**
     * 解锁:删除key
     */
    @Override
    public  abstract void unlock();
}

package com.srr.lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class BlockingZKLock extends BlockingZKLockStrategy{
    private static final String PATH = "/highPerformance_zklock";
    //当前节点路径
    private String currentPath;
    //前一个节点的路径
    private String beforePath;
    private CountDownLatch countDownLatch = null;
    //zk地址和端口
    public static final String ZK_ADDR = "192.168.32.129:2181";
    //超时时间
    public static final int SESSION_TIMEOUT = 30000;
    //创建zk
    protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT);

    public BlockingZKLock() {
        //如果不存在这个节点,则创建持久节点
        if (!zkClient.exists(PATH)) {
            zkClient.createPersistent(PATH);
        }
    }

    @Override
    protected boolean tryLock() {
        //如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
        //if (null == currentPath || "".equals(currentPath)) {
            //在path下创建一个临时的顺序节点
        currentPath = zkClient.createEphemeralSequential(PATH+"/", "lock");
        //}
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //获取所有的临时节点,并排序
        List<String> childrens = zkClient.getChildren(PATH);

        Collections.sort(childrens);

        if (currentPath.equals(PATH+"/"+childrens.get(0))) {
            return true;
        }else {//如果当前节点不是排名第一,则获取它前面的节点名称,并赋值给beforePath
            int pathLength = PATH.length();
            int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1));
            beforePath = PATH+"/"+childrens.get(wz-1);
        }
        return false;

    }

    @Override
    protected void waitLock() {
        IZkDataListener lIZkDataListener = new IZkDataListener() {

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                if (null != countDownLatch){
                    countDownLatch.countDown();
                }
                System.out.println("listen lock unlock");
            }

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        };
        //监听前一个节点的变化
        zkClient.subscribeDataChanges(beforePath, lIZkDataListener);
        if (zkClient.exists(beforePath)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener);

    }

    @Override
    public void unlock() {
        if (null != zkClient) {
            System.out.println("lock unclock");
            zkClient.delete(currentPath);
        }
    }

    @Override
    public boolean readLock() {
        return true;
    }

    @Override
    public boolean writeLock() {
        return true;
    }
}

package com.srr.lock;


import java.util.concurrent.CountDownLatch;

/**
 *  测试场景
 *  count从1加到4
 *  使用高性能的分布式锁在分布式环境下保证结果正确
 */
public class T {

    volatile int  count = 1;

    public void inc(){
        for(int i = 0;i<3;i++){
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            count++;
            System.out.println("count == "+count);
        }
    }

    public int getCount(){
       return count;
    }

    public static void main(String[] args) throws InterruptedException {
        final T t = new T();
        final Lock lock = new Lock();
        final CountDownLatch countDownLatch = new CountDownLatch(5);
        for(int i = 0;i<5;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    DistributedLock distributedLock = new BlockingZKLock();
                    if(lock.lock(distributedLock)){
                        t.inc();
                        lock.unlock(distributedLock);
                        countDownLatch.countDown();
                    }
                    System.out.println("count == "+t.getCount());
                }
            }).start();
        }
        countDownLatch.await();
    }
}

这种实现客户端只需监听它前一个节点的变化,不需要监听所有的节点,从而提高了zookeeper锁的性能。

  • 共享锁(S锁)
  • 写到这个,看了网络上很多错误的文章实现把排它锁当做共享锁

共享锁正确是实现姿势如下:

 

package com.srr.lock;

/**
 * @Description 分布式锁的接口
 */
abstract  public interface DistributedLock {
    /**
     * 获取锁
     */
    boolean lock();
    /**
     * 解锁
     */
    void unlock();

    abstract boolean readLock();
    abstract boolean writeLock();
}

package com.srr.lock;

/**
 * 共享锁策略
 */
abstract public class ZKSharedLockStrategy implements DistributedLock{
    @Override
    public boolean readLock() {
        //获取锁成功
        if (tryReadLock()){
            System.out.println(Thread.currentThread().getName()+"获取读锁成功");
            return true;
        }else{  //获取锁失败
            //阻塞一直等待
            waitLock();
            //递归,再次获取锁
            return true;
        }
    }

    @Override
    public boolean writeLock() {
        //获取锁成功
        if (tryWriteLock()){
            System.out.println(Thread.currentThread().getName()+"获取写锁成功");
            return true;
        }else{  //获取锁失败
            //阻塞一直等待
            waitLock();
            //递归,再次获取锁
            return true;
        }
    }

    /**
     * 尝试获取锁,子类实现
     */
    protected abstract boolean tryWriteLock() ;

    /**
     * 尝试获取锁,子类实现
     */
    protected abstract boolean tryReadLock() ;

    /**
     * 等待获取锁,子类实现
     */
    protected abstract void waitLock();

    /**
     * 解锁:删除key
     */
    @Override
    public  abstract void unlock();
}

package com.srr.lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * 共享锁
 */
public class ZKSharedLock extends ZKSharedLockStrategy{

    private static final String PATH = "/zk-root-readwrite-lock";
    //当前节点路径
    private String currentPath;
    //前一个节点的路径
    private String beforePath;
    private CountDownLatch countDownLatch = null;
    //zk地址和端口
    public static final String ZK_ADDR = "192.168.32.129:2181";
    //超时时间
    public static final int SESSION_TIMEOUT = 30000;
    //创建zk
    protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT);
    public ZKSharedLock() {
        //如果不存在这个节点,则创建持久节点
        if (!zkClient.exists(PATH)) {
            zkClient.createPersistent(PATH);
        }
    }

    @Override
    protected boolean tryWriteLock() {
        //如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
        if (null == currentPath || "".equals(currentPath)) {
        //在path下创建一个临时的顺序节点
        currentPath = zkClient.createEphemeralSequential(PATH+"/w", "writelock");
        }
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //获取所有的临时节点,并排序
        List<String> childrens = zkClient.getChildren(PATH);
        Collections.sort(childrens);

        if (currentPath.equals(PATH+"/"+childrens.get(0))) {
            return true;
        }else {//如果当前节点不是排名第一,则获取它前面的节点名称,并赋值给beforePath
            int pathLength = PATH.length();
            int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1));
            beforePath = PATH+"/"+childrens.get(wz-1);
        }
        return false;
    }

    @Override
    protected boolean tryReadLock() {
        //如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
        if (null == currentPath || "".equals(currentPath)) {
            //在path下创建一个临时的顺序节点
            currentPath = zkClient.createEphemeralSequential(PATH+"/r", "readklock");
        }
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //获取所有的临时节点,并排序
        List<String> childrens = zkClient.getChildren(PATH);
        Collections.sort(childrens);

        if (currentPath.equals(PATH+"/"+childrens.get(0))) {
            return true;
        }else if(isAllReadNodes(childrens)){
            return true;
        }else {//如果当前节点不是排名第一,则获取它前面的节点名称,并赋值给beforePath
            int pathLength = PATH.length();
            int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1));

            for (int i = wz - 1; i > 0; i--) {
                // 找到了离得最近的一个写节点,那么它的后一个节点要么是一个读节点,要么就是待加锁的节点本身
                if (childrens.get(i).indexOf("w") >= 0) {
                    beforePath = PATH + "/" + childrens.get(i);
                    break;
                }
            }
        }
        return false;
    }

    // 判断比自已小的节点是否都是读节点
    private boolean isAllReadNodes(List<String> sortNodes) {
        int pathLength = PATH.length();
        int currentIndex =  Collections.binarySearch(sortNodes, currentPath.substring(pathLength+1));
        for (int i = 0; i < currentIndex - 1; i++) {
            // 只要有一个写锁,则不能直接获取读锁
            if (sortNodes.get(i).indexOf("w") >= 0) {
                return false;
            }
        }

        return true;
    }

    @Override
    protected void waitLock() {
        IZkDataListener lIZkDataListener = new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                if (null != countDownLatch){
                    countDownLatch.countDown();
                }
                System.out.println("listen lock unlock");
            }
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        };
        //监听前一个节点的变化
        zkClient.subscribeDataChanges(beforePath, lIZkDataListener);
        if (zkClient.exists(beforePath)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener);
    }

    @Override
    public boolean lock() {
        return false;
    }

    @Override
    public void unlock() {
        if (null != zkClient) {
            System.out.println("lock unclock");
            zkClient.delete(currentPath);
            zkClient.close();
        }
    }
}

package com.srr.lock;

/**
 * 锁工具类
 */
public class Lock {
    /**
     * 获取锁
     */
    boolean lock(DistributedLock lock) {
        return lock.lock();
    };

    /**
     * 获取读锁
     */
    boolean readlock(DistributedLock lock) {
        return lock.readLock();
    };

    /**
     * 获取读锁
     */
    boolean writeLock(DistributedLock lock) {
        return lock.writeLock();
    };

    /**
     * 释放锁
     */
    void unlock(DistributedLock lock) {
        lock.unlock();
    };
}

package com.srr.lock;

import java.util.concurrent.CountDownLatch;

/**
 * 测试共享锁
 */
public class SharedLockTest {
    private static volatile int count = 0;
    public static void main(String[] args) throws Exception {
        final Lock lock = new Lock();
        final CountDownLatch countDownLatch = new CountDownLatch(10);

        new Thread(new Runnable() {
            @Override
            public void run() {
                testWriteLock(8);
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                testReadLock(10);
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                testReadLock(20);
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                testWriteLock(11);
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                testWriteLock(30);
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                testReadLock(9);
            }
        }).start();

        countDownLatch.await();
    }

    // 读锁
    private static void testReadLock(long sleepTime) {
        try {
            Lock lock = new Lock();
            DistributedLock dlock = new ZKSharedLock();
            lock.readlock(dlock);
            System.out.println("i get readlock ->" + sleepTime);
            System.out.println("count = "+ count);
            Thread.sleep(sleepTime);
            lock.unlock(dlock);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 写锁
    private static void testWriteLock(long sleepTime) {
        try {
            Lock lock = new Lock();
            DistributedLock dlock = new ZKSharedLock();
            lock.writeLock(dlock);
            System.out.println("i get writelock ->" + sleepTime);
            count++;
            Thread.sleep(sleepTime);
            lock.unlock(dlock);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

运行结果:

 

 

 从结果可以看出读锁和读锁可以共享锁,而写锁必须等待读锁或者写锁释放之后才能获取锁。

最后,zk分布式锁完美解决方案:

  • Apache Curator
  • Apache Curator is a Java/JVM client library for Apache ZooKeeper, a distributed coordination service. It includes a highlevel API framework and utilities to make using Apache ZooKeeper much easier and more reliable. It also includes recipes for common use cases and extensions such as service discovery and a Java 8 asynchronous DSL.
  • Curator n ˈkyoor͝ˌātər: a keeper or custodian of a museum or other collection - A ZooKeeper Keeper.

网上很多文章竟然标题用Curator实现分布式锁,大哥Curator框架本身已经实现了分布式锁而且提供了各种各样的锁api供大家使用,我们不用再基于Curator实现分布式锁,这不是多此一举吗?这里给出一个简单的使用案例,旨在说明意图:

package com.srr.lock;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 *  测试场景
 *  count从1加到101
 *  使用redis分布式锁在分布式环境下保证结果正确
 */
public class CuratorDistributedLockTest {
    private static final String lockPath = "/curator_lock";
    //zk地址和端口
    public static final String zookeeperConnectionString = "192.168.32.129:2181";

    volatile int  count = 1;

    public void inc(){
        for(int i = 0;i<10;i++){
            count++;
            System.out.println("count == "+count);
        }
    }

    public int getCount(){
        return count;
    }

    public static void main(String[] args) throws InterruptedException {
        final T t = new T();
        final Lock lock = new Lock();
        final CountDownLatch countDownLatch = new CountDownLatch(4);
        for(int i = 0;i<4;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    RetryPolicy retryPolicy = new ExponentialBackoffRetry(10, 5000);
                    CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                    client.start();
                    InterProcessMutex lock = new InterProcessMutex(client, lockPath);
                    try {
                        if (lock.acquire(10 * 1000, TimeUnit.SECONDS))
                        {
                            try
                            {
                                System.out.println("get the lock");
                                t.inc();
                            }
                            finally
                            {
                                lock.release();
                                System.out.println("unlock the lock");
                            }
                        }
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                    countDownLatch.countDown();
                }
            }).start();
        }

        countDownLatch.await();
        System.out.println("total count == "+t.getCount());
    }
}

运行结果:

 

 

 如果想更多了解Curator框架,请移步http://curator.apache.org/,官网给出了详细的使用案例及介绍。至此zk实现分布式锁总结完毕!

 原创不易,请多多关注!

 

加载全部内容

相关教程
猜你喜欢
用户评论