博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Redis整合Spring实现分布式锁
阅读量:6850 次
发布时间:2019-06-26

本文共 14071 字,大约阅读时间需要 46 分钟。

spring把专门的数据操作独立封装在spring-data系列中,spring-data-redis是对Redis的封装

org.springframework.data
spring-data-redis
1.4.2.RELEASE
redis.clients
jedis
2.6.2
org.apache.commons
commons-pool2
2.4.2
javax.persistence
persistence-api
1.0.2
junit
junit
4.12
org.springframework
spring-test
4.3.2.RELEASE
commons-logging
commons-logging
1.2

 

Spring 配置文件applicationContext.xml

注意新版的maxTotal,MaxWaitMillis这两个字段与旧版的不同。

redis连接池配置文件redis.properties

redis.host=192.168.2.129redis.port=6379 redis.pass=redis129  redis.maxIdle=300 redis.maxTotal=600 redis.MaxWaitMillis=1000 redis.testOnBorrow=true

好了,配置完成,下面写上代码

测试代码

import javax.persistence.Entity;import javax.persistence.Table;/** * @author  * @date 创建时间:2017年2月9日 上午8:51:02 * @parameter */@Entity@Table(name= "t_user")public class User {    //主键    private String id;    //用户名    private String userName;    public String getId() {        return id;    }    public void setId(String id) {        this.id = id;    }    public String getUserName() {        return userName;    }    public void setUserName(String userName) {        this.userName = userName;    }    }

BaseRedisDao

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;/** * @author  * @date 创建时间:2017年2月9日 上午9:02:16 * @parameter */public class BaseRedisDao
{ @Autowired(required=true) protected RedisTemplate
redisTemplate;}

IUserDao 

public interface IUserDao {         public boolean save(User user);         public boolean update(User user);     public boolean delete(String userIds);         public User find(String userId);     }

UserDao 

package com.shanheyongmu.dao;import org.springframework.dao.DataAccessException;import org.springframework.data.redis.connection.RedisConnection;import org.springframework.data.redis.core.RedisCallback;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.serializer.RedisSerializer;import com.shanheyongmu.entity.User;/** * @author  * @date 创建时间:2017年2月9日 上午9:08:28 * @parameter */public class UserDao extends BaseRedisDao
implements IUserDao { public boolean save(final User user) { boolean res = redisTemplate.execute(new RedisCallback
() { public Boolean doInRedis(RedisConnection connection) throws DataAccessException { RedisSerializer
serializer = redisTemplate.getStringSerializer(); byte[] key = serializer.serialize(user.getId()); byte[] value = serializer.serialize(user.getUserName()); //set not exits return connection.setNX(key, value); } }); return res; } public boolean update(final User user) { boolean result = redisTemplate.execute(new RedisCallback
() { public Boolean doInRedis(RedisConnection connection) throws DataAccessException { RedisSerializer
serializer = redisTemplate.getStringSerializer(); byte[] key = serializer.serialize(user.getId()); byte[] name = serializer.serialize(user.getUserName()); //set connection.set(key, name); return true; } }); return result; } public boolean delete(final String userId) { boolean result = redisTemplate.execute(new RedisCallback
() { public Boolean doInRedis(RedisConnection connection) throws DataAccessException { RedisSerializer
serializer = redisTemplate.getStringSerializer(); byte[] key = serializer.serialize(userId); //delete connection.del(key); return true; } }); return result; } public User find(final String userId) { User result = redisTemplate.execute(new RedisCallback
() { public User doInRedis(RedisConnection connection) throws DataAccessException { RedisSerializer
serializer = redisTemplate.getStringSerializer(); byte[] key = serializer.serialize(userId); //get byte[] value = connection.get(key); if(value == null) { return null; } String name = serializer.deserialize(value); User resUser = new User(); resUser.setId(userId); resUser.setUserName(name); return resUser; } }); return result; }}

Test

写这个类的时候需要引入junit包和spring-test.jar

package com.shanheyongmu.test;import org.junit.Assert;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import com.shanheyongmu.dao.IUserDao;import com.shanheyongmu.entity.User;/** * @author  * @date 创建时间:2017年2月9日 上午10:42:55 * @parameter */@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations = {"classpath*:applicationContext.xml"})public class RedisTest extends AbstractJUnit4SpringContextTests {        @Autowired    private IUserDao userDao;    @Test    public void testSaveUser() {        User user = new User();        user.setId("402891815170e8de015170f6520b0000");        user.setUserName("zhangsan");    }        @Test    public void testGetUser() {        User user = new User();        user = userDao.find("402891815170e8de015170f6520b0000");        System.out.println(user.getId() + "-" +user.getUserName());    }        @Test    public void testUpdateUser() {        User user = new User();        user.setId("402891815170e8de015170f6520b0000");        user.setUserName("lisi");        boolean res = userDao.update(user);        Assert.assertTrue(res);            }    @Test    public void testDeleteUser() {        boolean res = userDao.delete("402891815170e8de015170f6520b0000");        Assert.assertTrue(res);    }        }

String类型的增删该查已完成,Hash,List,Set数据类型的操作就不举例了,和使用命令的方式差不多。如下

connection.hSetNX(key, field, value);connection.hDel(key, fields);connection.hGet(key, field); connection.lPop(key);connection.lPush(key, value);connection.rPop(key);connection.rPush(key, values); connection.sAdd(key, values);connection.sMembers(key);connection.sDiff(keys);connection.sPop(key);

整合可能遇到的问题

1.NoSuchMethodError

java.lang.NoSuchMethodError: org.springframework.core.serializer.support.DeserializingConverter.
(Ljava/lang/ClassLoader;)V Caused by: java.lang.NoSuchMethodError: redis.clients.jedis.JedisShardInfo.setTimeout(I)V

类似找不到类,找不到方法的问题,当确定依赖的jar已经引入之后,此类问题多事spring-data-redis以及jedis版本问题,多换个版本试试,本文上面提到的版本可以使用。

1.No qualifying bean

No qualifying bean of type [org.springframework.data.redis.core.RedisTemplate] found for dependency

找不到bean,考虑applicationContext.xml中配置redisTemplate bean时实现类是否写错。例如,BaseRedisDao注入的是RedisTemplate类型的对象,applicationContext.xml中配置的实现类却是RedisTemplate的子类StringRedisTemplate,那肯定报错。整合好后,下面我们着重学习基于redis的分布式锁的实现。

基于redis实现的分布式锁

我们知道,在多线程环境中,锁是实现共享资源互斥访问的重要机制,以保证任何时刻只有一个线程在访问共享资源。锁的基本原理是:用一个状态值表示锁,对锁的占用和释放通过状态值来标识,因此基于redis实现的分布式锁主要依赖redis的SETNX命令和DEL命令,SETNX相当于上锁,DEL相当于释放锁,当然,在下面的具体实现中会更复杂些。之所以称为分布式锁,是因为客户端可以在redis集群环境中向集群中任一个可用Master节点请求上锁(即SETNX命令存储key到redis缓存中是随机的)。

 

现在相信你已经对在基于redis实现的分布式锁的基本概念有了解,需要注意的是,这个和前面文章提到的使用WATCH 命令对key值进行锁操作没有直接的关系。java中synchronized和Lock对象都能对共享资源进行加锁,下面我们将学习用java实现的redis分布式锁。

java中的锁技术

在分析java实现的redis分布式锁之前,我们先来回顾下java中的锁技术,为了直观的展示,我们采用“多个线程共享输出设备”来举例。

不加锁共享输出设备

 

package com.shanheyongmu.lock;

 

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

 

import com.shanheyongmu.redislock.RedisLock;

public class LockTest {    //不加锁    static class Outputer {        public void output(String name) {            for(int i=0; i

 

上面例子中,三个线程同时共享输出设备output,线程1需要打印zhangsan,线程2需要打印lingsi,线程3需要打印wangwu。在不加锁的情况,这三个线程会不会因为得不到输出设备output打架呢,我们来看看运行结果:

huangwuzhangslingsianhuangwuzlingsihangsanhuangwulzhangsaningsihuangwulingsi

从运行结果可以看出,三个线程打架了,线程1没打印完zhangsan,线程2就来抢输出设备......可见,这不是我们想要的,我们想要的是线程之间能有序的工作,各个线程之间互斥的使用输出设备output。

使用java5中的Lock对输出设备加锁

现在我们对Outputer进行改进,给它加上锁,加锁之后每次只有一个线程能访问它。

 

//使用java5中的锁static class Outputer{    Lock lock = new ReentrantLock();    public void output(String name) {        //传统java加锁        //synchronized (Outputer.class){        lock.lock();        try {            for(int i=0; i

看看加锁后的输出结果:

zhangsanlingsihuangwuzhangsanlingsihuangwuzhangsanlingsihuangwuzhangsanlingsihuangwuzhangsanlingsihuangwu......

从运行结果中可以看出,三个线程之间不打架了,线程之间的打印变得有序。有个这个基础,下面我们来学习基于Redis实现的分布式锁就更容易了。

Redis分布式锁

实现分析

从上面java锁的使用中可以看出,锁对象主要有lock与unlock方法,在lock与unlock方法之间的代码(临界区)能保证线程互斥访问。基于redis实现的Java分布式锁主要依赖redis的SETNX命令和DEL命令,SETNX相当于上锁(lock),DEL相当于释放锁(unlock)。我们只要实现Lock接口重写lock()和unlock()即可。但是这还不够,安全可靠的分布式锁应该满足满足下面三个条件:

l 互斥,不管任何时候,只有一个客户端能持有同一个锁。

l 不会死锁,最终一定会得到锁,即使持有锁的客户端对应的master节点宕掉。

l 容错,只要大多数Redis节点正常工作,客户端应该都能获取和释放锁。

那么什么情况下回不满足上面三个条件呢。多个线程(客户端)同时竞争锁可能会导致多个客户端同时拥有锁。比如,

(1)线程1在master节点拿到了锁(存入key)

(2)master节点在把线程1创建的key写入slave之前宕机了,此时集群中的节点已经没有锁(key)了,包括master节点的slaver节点

(3)slaver节点升级为master节点

(4)线程2向新的master节点发起锁(存入key)请求,很明显,能请求成功。

可见,线程1和线程2同时获得了锁。如果在更高并发的情况,可能会有更多线程(客户端)获取锁,这种情况就会导致上文所说的线程“打架”问题,线程之间的执行杂乱无章。

 

那什么情况下又会发生死锁的情况呢。如果拥有锁的线程(客户端)长时间的执行或者因为某种原因造成阻塞,就会导致锁无法释放(unlock没有调用),其它线程就不能获取锁而而产生无限期死锁的情况。其它线程在执行lock失败后即使粗暴的执行unlock删除key之后也不能正常释放锁,因为锁就只能由获得锁的线程释放,锁不能正常释放其它线程仍然获取不到锁。解决死锁的最好方式是设置锁的有效时间(redis的expire命令),不管是什么原因导致的死锁,有效时间过后,锁将会被自动释放。

 

为了保障容错功能,即只要有Redis节点正常工作,客户端应该都能获取和释放锁,我们必须用相同的key不断循环向Master节点请求锁,当请求时间超过设定的超时时间则放弃请求锁,这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间,如果一个master节点不可用了,应该尽快尝试下一个master节点。释放锁比较简单,因为只需要在所有节点都释放锁就行,不管之前有没有在该节点获取锁成功。

Redlock算法

根据上面的分析,官方提出了一种用Redis实现分布式锁的算法,这个算法称为RedLock。RedLock算法的主要流程如下:

 

RedLock算法主要流程

 

 

Java实现

 

结合上面的流程图,加上下面的代码解释,相信你一定能理解redis分布式锁的实现原理

public class RedisLock implements Lock{     protected StringRedisTemplate redisStringTemplate;     // 存储到redis中的锁标志    private static final String LOCKED = "LOCKED";     // 请求锁的超时时间(ms)    private static final long TIME_OUT = 30000;     // 锁的有效时间(s)    public static final int EXPIRE = 60;     // 锁标志对应的key;    private String key;     // state flag    private volatile boolean isLocked = false;     public RedisLock(String key) {        this.key = key;        @SuppressWarnings("resource")        ApplicationContext  ctx =  new ClassPathXmlApplicationContext("classpath*:applicationContext.xml");        redisStringTemplate = (StringRedisTemplate)ctx.getBean("redisStringTemplate");    }     @Override    public void lock() {        //系统当前时间,毫秒        long nowTime = System.nanoTime();        //请求锁超时时间,毫秒        long timeout = TIME_OUT*1000000;        final Random r = new Random();        try {            //不断循环向Master节点请求锁,当请求时间(System.nanoTime() - nano)超过设定的超时时间则放弃请求锁            //这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间            //如果一个master节点不可用了,应该尽快尝试下一个master节点            while ((System.nanoTime() - nowTime) < timeout) {                //将锁作为key存储到redis缓存中,存储成功则获得锁                if (redisStringTemplate.getConnectionFactory().getConnection().setNX(key.getBytes(),                        LOCKED.getBytes())) {                    //设置锁的有效期,也是锁的自动释放时间,也是一个客户端在其他客户端能抢占锁之前可以执行任务的时间                    //可以防止因异常情况无法释放锁而造成死锁情况的发生                    redisStringTemplate.expire(key, EXPIRE, TimeUnit.SECONDS);                    isLocked = true;                    //上锁成功结束请求                    break;                }                //获取锁失败时,应该在随机延时后进行重试,避免不同客户端同时重试导致谁都无法拿到锁的情况出现                //睡眠3毫秒后继续请求锁                Thread.sleep(3, r.nextInt(500));            }        } catch (Exception e) {            e.printStackTrace();        }    }     @Override    public void unlock() {        //释放锁        //不管请求锁是否成功,只要已经上锁,客户端都会进行释放锁的操作        if (isLocked) {            redisStringTemplate.delete(key);        }    }     @Override    public void lockInterruptibly() throws InterruptedException {        // TODO Auto-generated method stub             }     @Override    public boolean tryLock() {        // TODO Auto-generated method stub        return false;    }     @Override    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {        // TODO Auto-generated method stub        return false;    }     @Override    public Condition newCondition() {        // TODO Auto-generated method stub        return null;    }}

好了,RedisLock已经实现,我们对Outputer使用RedisLock进行修改

/使用RedisLockstatic class Outputer {    //创建一个名为redisLock的RedisLock类型的锁    RedisLock redisLock = new RedisLock("redisLock");    public void output(String name) {        //上锁        redisLock.lock();        try {            for(int i=0; i

看看使用RedisLock加锁后的的运行结果

lingsizhangsanhuangwulingsizhangsanhuangwulingsizhangsanhuangwulingsizhangsanhuangwulingsizhangsanhuangwu......

可见,使用RedisLock加锁后线程之间不再“打架”,三个线程互斥的访问output。

问题

现在我无法论证RedLock算法在分布式、高并发环境下的可靠性,但从本例三个线程的运行结果看,RedLock算法确实保证了三个线程互斥的访问output(redis.maxIdle=300 redis.maxTotal=600,运行到Timeout waiting for idle object都没有出现线程“打架”的问题)。我认为RedLock算法仍有些问题没说清楚,比如,如何防止宕机时多个线程同时获得锁;RedLock算法在释放锁的处理上,不管线程是否获取锁成功,只要上了锁,就会到每个master节点上释放锁,这就会导致一个线程上的锁可能会被其他线程释放掉,这就和每个锁只能被获得锁的线程释放相互矛盾。这些有待后续进一步交流学习研究。

 

参考文档

全部系列到此告一段落了 ,可能很多人觉得还不够 在此我在搜索问题的过程中发现了比较好的 博客spring data+redis

 

http://www.cnblogs.com/xiohao/p/9063135.html

你可能感兴趣的文章
Carthage使用
查看>>
dede栏目添加自定义字段方法
查看>>
跟着实例学习设计模式(9)-桥接模式bridge(结构型)
查看>>
Visual studio 2005 cannot break at breakpoint
查看>>
linux高级技巧:heartbeat+lvs(三)
查看>>
Swift新手教程系列5-函数+selector在swift中的使用方法
查看>>
留言版
查看>>
11.Vue安装Axios及使用
查看>>
sql语法:inner join on, left join on, right join on详细使用方法
查看>>
.c 文件 和 .h 文件分别做了什么事? c
查看>>
LeetCode-153-Find Minimum in Rotated Sorted Array
查看>>
sql server 判断是否存在数据库,表,列,视图
查看>>
HTML 介绍
查看>>
研读设计模式小结
查看>>
02.Python网络爬虫第二弹(http和https协议)
查看>>
bootstrap之HTML模板
查看>>
[转]Ubuntu Server下如何安装图形界面?
查看>>
[linux] tcpdump抓包案例
查看>>
WCF与WebService的区别(转)
查看>>
一个简单得不能再简单的“ORM”了
查看>>