Nemo

Nemo 关注TA

路漫漫其修远兮,吾将上下而求索。

Nemo

Nemo

关注TA

路漫漫其修远兮,吾将上下而求索。

  •  普罗旺斯
  • 负责帅就完事了
  • 写了1,496,113字

该文章投稿至Nemo社区   Java  板块 复制链接


Spring 整合Redis集群配置

发布于 2016/09/27 16:02 4,986浏览 4回复 14,685

最近在做这边的redis集群这块的内容,这里稍稍记录下目前的一些配置。

这是在原来单redis服务的基础上改进过来的,也保留了原有的一些单服务的配置。


======================核心:spring-redis.xml==========================


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd"
       default-lazy-init="false">

    <context:property-placeholder
            ignore-unresolvable="true" location="classpath*:/application.properties" />

    <!-- 连接池配置. -->
    <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
        <!-- 连接池中最大连接数。高版本:maxTotal,低版本:maxActive -->
        <property name="maxTotal" value="${redis.maxTotal}" />
        <!-- 连接池中最大空闲的连接数. -->
        <property name="maxIdle" value="${redis.maxIdle}" />
        <!-- 连接池中最少空闲的连接数. -->
        <property name="minIdle" value="${redis.minIdle}" />
        <!-- 当连接池资源耗尽时,调用者最大阻塞的时间,超时将跑出异常。单位,毫秒数;默认为-1.表示永不超时。高版本:maxWaitMillis,低版本:maxWait -->
        <property name="maxWaitMillis" value="${redis.maxWaitMillis}" />
        <!-- 连接空闲的最小时间,达到此值后空闲连接将可能会被移除。负值(-1)表示不移除. -->
        <property name="minEvictableIdleTimeMillis" value="${redis.minEvictableIdleTimeMillis}" />
        <!-- 对于“空闲链接”检测线程而言,每次检测的链接资源的个数。默认为3 -->
        <property name="numTestsPerEvictionRun" value="${redis.numTestsPerEvictionRun}" />
        <!-- “空闲链接”检测线程,检测的周期,毫秒数。如果为负值,表示不运行“检测线程”。默认为-1. -->
        <property name="timeBetweenEvictionRunsMillis" value="${redis.timeBetweenEvictionRunsMillis}" />
        <!-- testOnBorrow:向调用者输出“链接”资源时,是否检测是有有效,如果无效则从连接池中移除,并尝试获取继续获取。默认为false。建议保持默认值. -->
        <!-- testOnReturn:向连接池“归还”链接时,是否检测“链接”对象的有效性。默认为false。建议保持默认值.-->
        <!-- testWhileIdle:向调用者输出“链接”对象时,是否检测它的空闲超时;默认为false。如果“链接”空闲超时,将会被移除。建议保持默认值. -->
        <!-- whenExhaustedAction:当“连接池”中active数量达到阀值时,即“链接”资源耗尽时,连接池需要采取的手段, 默认为1(0:抛出异常。1:阻塞,直到有可用链接资源。2:强制创建新的链接资源) -->
    </bean>

    <!-- Spring提供的Redis连接工厂,这部分适合在单缓存服务器的情况下使用 -->
    <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" destroy-method="destroy">
        <!-- 连接池配置. -->
        <property name="poolConfig" ref="jedisPoolConfig" />
        <!-- Redis服务主机. -->
        <property name="hostName" value="${redis.hostName}" />
        <!-- Redis服务端口号. -->
        <property name="port" value="${redis.port}" />
        <!-- Redis服务连接密码. -->
        <property name="password" value="${redis.password}" />
        <!-- 连超时设置. -->
        <property name="timeout" value="${redis.timeout}" />
        <!-- 是否使用连接池. -->
        <property name="usePool" value="${redis.usePool}" />
    </bean>

    <!--单机缓存服务器可以直接使用这个对象,集群缓存服务器则需要更换-->
    <!-- Spring提供的访问Redis类. -->
    <bean id="jedisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
        <property name="connectionFactory" ref="jedisConnectionFactory" />
        <property name="keySerializer">
            <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
        </property>
        <property name="valueSerializer">
            <!-- <bean class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" /> -->
            <bean class="org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer" />
        </property>
    </bean>

    <!--缓存服务器集群可以使用的对象-->
    <!-- 使用构造方法注入:public JedisCluster(Set<HostAndPort> nodes, int timeout, final GenericObjectPoolConfig poolConfig)  -->
    <bean id="jedisCluster" class="redis.clients.jedis.JedisCluster">
        <constructor-arg index="0">
            <set>
                <!--如果有新添加的服务器,可以在此直接配置服务器地址和端口即可-->
                <bean class="redis.clients.jedis.HostAndPort">
                    <constructor-arg type="java.lang.String" value="172.20.98.250" />
                    <constructor-arg type="int" value="7000" />
                </bean>
                <bean class="redis.clients.jedis.HostAndPort">
                    <constructor-arg type="java.lang.String" value="172.20.98.250" />
                    <constructor-arg type="int" value="7001" />
                </bean>
                <bean class="redis.clients.jedis.HostAndPort">
                    <constructor-arg type="java.lang.String" value="172.20.98.250" />
                    <constructor-arg type="int" value="7002" />
                </bean>
                <bean class="redis.clients.jedis.HostAndPort">
                    <constructor-arg type="java.lang.String" value="172.20.98.250" />
                    <constructor-arg type="int" value="7003" />
                </bean>
                <bean class="redis.clients.jedis.HostAndPort">
                    <constructor-arg type="java.lang.String" value="172.20.98.250" />
                    <constructor-arg type="int" value="7004" />
                </bean>
                <bean class="redis.clients.jedis.HostAndPort">
                    <constructor-arg type="java.lang.String" value="172.20.98.250" />
                    <constructor-arg type="int" value="7005" />
                </bean>
            </set>
        </constructor-arg>
        <constructor-arg index="1" value="${redis.timeout}" type="int"></constructor-arg>
        <constructor-arg index="2" ref="jedisPoolConfig"></constructor-arg>
    </bean>

</beans>


=========================基础配置:application.proterties======================


##########REDIS CONFIG##########
###Add by Nemo on 2016/09/18####
redis.timeout=5000
redis.maxTotal=8
redis.maxIdle=4
redis.minIdle=1
redis.maxWaitMillis=5000
redis.minEvictableIdleTimeMillis=300000
redis.numTestsPerEvictionRun=3
redis.timeBetweenEvictionRunsMillis=60000
#####以下部分配置在仅使用单个缓存服务器的时候使用
redis.hostName=172.20.98.250
redis.port=7000

redis.password=12345a*
redis.usePool=true



=======================AOP缓存切点自定义annotation:RedisCache======================

package com.ict.web.commons.redis.anotation;

import java.lang.annotation.*;

/**
* 判断是否有缓存的自定义注解接口
* 该注解标志拦截dao层,如果存在缓存,则直接返回该缓存数据,否则则执行原查询
* 示例:
*      @RedisCache(type = User.class)
*      type的值相当于这个缓存的标签,用于方便对同一类缓存进行统一的管理
*      尤其是当数据修改后需要更新的缓存,这些缓存是通过这个标签来决定是否需要更新的
* Created by nemo on 16-9-14.
*/
@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisCache {
Class type();
}



=====================AOP清缓存自定义切点annotation:RedisEvict=======================

package com.ict.web.commons.redis.anotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* 清除缓存的自定义注解接口
* 当执行一些数据更新操作的方法前需要执行该方法,如:update,delete,insert一类的方法前
* 示例:
*      @RedisEvict(type = User.class)
*      type的值是需要清除的缓存类别的标签,这个标签下的所有缓存都会被清除
* Created by nemo on 16-9-14.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedisEvict {
Class type();
}

========================AOP切面:RedisCacheAspect=======================

package com.ict.web.commons.redis.aspect;

import com.ict.web.commons.redis.anotation.RedisCache;
import com.ict.web.commons.redis.anotation.RedisEvict;
import com.ict.web.commons.redis.util.SerializeUtil;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import redis.clients.jedis.JedisCluster;

import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.LinkedList;
import java.util.Queue;

/**
* 缓存的切面
* 用来定义一些针对Service/DAO层的缓存操作策略
* Created by nemo on 16-9-14.
*/
@Aspect
@Component
public class RedisCacheAspect {

public static final Logger infoLog = LoggerFactory.getLogger(RedisCacheAspect.class);

/**
* 自动注入jedisTemplate,这里是单缓存服务器时可以使用的对象
*/
@Autowired @Qualifier("jedisTemplate")
private RedisTemplate redisTemplate;

/**
* 自动注入,这里是集群缓存服务器时可以使用的对象
*/
@Autowired @Qualifier("jedisCluster")
private JedisCluster jedisCluster;

/**
*  cache切入点
*/
@Pointcut("@annotation(com.ict.web.commons.redis.anotation.RedisCache)")
public void mRedisCache() {
}

/**
* clear cache 切入点
*/
@Pointcut("@annotation(com.ict.web.commons.redis.anotation.RedisEvict)")
public void clearCache(){
}

/**
* 方法调用前,先查询缓存。如果存在缓存,则返回缓存数据,阻止方法调用;
* 如果没有缓存,则调用业务方法,然后将结果放到缓存中
* @param jp
* @return
* @throws Throwable
*/
@Around("mRedisCache()")
public Object around(ProceedingJoinPoint jp) throws Throwable {

// 得到类名、方法名和参数
String clazzName = jp.getTarget().getClass().getName();
String methodName = jp.getSignature().getName();
Object[] args = jp.getArgs();

// 根据类名,方法名和参数生成key
String key = genKey(clazzName, methodName, args);
if (infoLog.isDebugEnabled()) {
infoLog.debug("生成key:{}", key);
}

// 得到被代理的方法
Method me = ((MethodSignature) jp.getSignature()).getMethod();

// 得到被代理的方法上的注解
Class modelType = me.getAnnotation(RedisCache.class).type();

// 检查redis中是否有缓存
Object value = null;
try {

//如果清除缓存线程没有在执行,则重新打开一个线程,确保缓存栈中没有剩余缓存
if (mQueueThread == null || !mQueueThread.isAlive()) {
mThread();
}

//清除队列中存在该项,需要先清除
if(queue.contains(modelType.getName())) {

//集群时的删除
jedisCluster.del(modelType.getName());

//单机时则先清除
//redisTemplate.delete(modelType.getName());

queue.remove(modelType.getName());
}

//确保需要清除的队列中没有该key再往下执行
if (!queue.contains(modelType.getName())) {

//单机时的取缓存方法
//value = redisTemplate.opsForHash().get(modelType.getName(), key);

//集群时的取缓存的方法
value = jedisCluster.hget(modelType.getTypeName(),key);
//取得的时候是字符串,需要反序列化
value = SerializeUtil.unSerialize_jdk((String) value);

}

}catch (Exception e){
e.printStackTrace();
if (infoLog.isDebugEnabled()) {
infoLog.debug("缓存服务器连接发生错误,需要检查网络...");
}
return jp.proceed();
}

// result是方法的最终返回结果
Object result;
if (null == value) {
// 缓存未命中
if (infoLog.isDebugEnabled()) {
infoLog.debug("缓存未命中");
}

//调用接下去需要执行的方法,取得返回数据
result = jp.proceed(args);

// 序列化查询结果为JSON
//String json = serialize(result);

/**
*  序列化结果放入缓存,需要加一个标签modelType,方便统一管理同一个类别的缓存
*  这里采用的是JDK自带的序列化对象保存,因为JSON反序列复杂对象的时候速度很不理想
*/
new Thread(new Runnable() {
public void run() {
try {
//缓存
//redisTemplate.opsForHash().put(modelType.getName(), key, result);
jedisCluster.hset(modelType.getName(), key, SerializeUtil.toSerialize_jdk(result));
}catch (Exception e){
e.printStackTrace();
}
}
}).start();

} else {
// 缓存命中
if (infoLog.isDebugEnabled()) {
infoLog.debug("缓存命中, value = {}", value);
}

return value;

//因为从JSON反序列化的过程时间消耗很不理想,所以此处还是使用JDK原生提供的序列化
//如果稍候能够解决JSON反序列化的问题,这里可以考虑使用,毕竟JSON的内存占用比JDK序列化对象少得多
// 得到被代理方法的返回值类型
//            Class returnType = ((MethodSignature) jp.getSignature()).getReturnType();

// 反序列化从缓存中拿到的json
//            result = deserialize(value, returnType, modelType);

//            if (infoLog.isDebugEnabled()) {
//                infoLog.debug("反序列化结果 = {}", result);
//            }


}

return result;
}

/**
* 在方法调用前清除缓存,然后调用业务方法
* @param jp
* @return
* @throws Throwable
*/
@Around("clearCache()")
public Object evictCache(ProceedingJoinPoint jp) throws Throwable {

// 得到被代理的方法
Method me = ((MethodSignature) jp.getSignature()).getMethod();

// 得到被代理的方法上的注解,这里是定义为同一个标签的数据,全部需要清除
Class modelType = me.getAnnotation(RedisEvict.class).type();

if (infoLog.isDebugEnabled()) {
infoLog.debug("清空缓存:{}", modelType.getName());
}

// 清除对应缓存
try {
jedisCluster.del(modelType.getName());
//redisTemplate.delete(modelType.getName());
}catch (Exception e){
infoLog.debug("缓存服务器连接发生错误,需要检查网络...加入清缓存队列...");
queue.add(modelType.getName()); //发生异常的时候把它放到队列中
}

// 调用业务方法
return jp.proceed(jp.getArgs());
}


private static final String DELIMITER = "-";

/**
* 根据类名、方法名和参数生成key
* @param clazzName
* @param methodName
* @param args 方法参数
* @return
*/
protected String genKey(String clazzName, String methodName, Object[] args) {
StringBuilder sb = new StringBuilder(clazzName);
sb.append(DELIMITER);
sb.append(methodName);
sb.append(DELIMITER);

for (Object obj : args) {
if(obj!=null) {
sb.append(obj.toString());
sb.append(DELIMITER);
}
}

return sb.toString();
}

/**
*
*/
private Thread mQueueThread;

/**
* 需要清除的缓存队列
*/
private Queuequeue=new LinkedList<>();

/**
* 需要清除缓存的队列线程
* 一些原因下,运用服务器与缓存服务器联络不通畅,导致一些该清除的缓存没有被及时清除
* 所以当每一次读取缓存数据的时候需要先检查清除缓存的队列,确保不会读取到垃圾数据
*/
private void mThread(){
mQueueThread = new Thread(new Runnable() {
@Override
public void run() {
while(queue.size()>0){ //如果队列中有东西
String key = queue.peek(); //先拿
try {
jedisCluster.del(key);
//redisTemplate.delete(key); //删除缓存
queue.remove(key); //再队列中删除
}catch (Exception e){
break; //发生异常则证明此时网络还不通,直接退出,等待下次重连即可
}
}
}
});
mQueueThread.start();
}

}

=================用到的序列化和反序列化工具:SerializeUtil=================

package com.ict.web.commons.redis.util;

import com.alibaba.fastjson.JSON;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;

/**
 * 序列化和反序列化工具
 * 这里提供JDK自带的方式和Json的方式
 * Created by nemo on 16-9-22.
 */
public class SerializeUtil {

    /**
     * 序列化为JSON字符串,null
本文标签
 {{tag}}
点了个评