博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
redis缓存和mysql数据库同步
阅读量:5818 次
发布时间:2019-06-18

本文共 21833 字,大约阅读时间需要 72 分钟。

转载自:https://www.cnblogs.com/lanbo203/p/7494587.html

解决方案

一、对强一致要求比较高的,应采用实时同步方案,即查询缓存查询不到再从DB查询,保存到缓存;更新缓存时,先更新数据库,再将缓存的设置过期(建议不要去更新缓存内容,直接设置缓存过期)。

二、对于并发程度较高的,可采用异步队列的方式同步,可采用kafka等消息中间件处理消息生产和消费。

三、使用阿里的同步工具canal,canal实现方式是模拟mysql slave和master的同步机制,监控DB bitlog的日志更新来触发缓存的更新,此种方法可以解放程序员双手,减少工作量,但在使用时有些局限性。

四、采用UDF自定义函数的方式,面对mysql的API进行编程,利用触发器进行缓存同步,但UDF主要是c/c++语言实现,学习成本高。

实时同步

spring3+提供了注解的方式进行缓存编程

@Cacheable(key = "caches[0].name + T(String).valueOf(#userId)",unless = "#result eq null")
@CachePut(key = "caches[0].name + T(String).valueOf(#user.userId)")
@CacheEvict(key = "caches[0].name + T(String).valueOf(#userId)" )
@Caching(evict = {@CacheEvict(key = "caches[0].name + T(String).valueOf(#userId)" ),                   @CacheEvict(key = "caches[0].name + #result.name" )})
@Cacheable:查询时使用,注意Long类型需转换为Sting类型,否则会抛异常
@CachePut:更新时使用,使用此注解,一定会从DB上查询数据
@CacheEvict:删除时使用;
@Caching:组合用法      具体注解的使用可参考官网 注意:注解方式虽然能使我们的代码简洁,但是注解方式有局限性:对key的获取,以及嵌套使用时注解无效,如下所示
public class User {    private Long userId;    private String name;    private Integer age;    private String sex;    private String addr;   //get set .....}

service接口

1
2
3
4
5
6
7
public 
interface 
UserService {
    
User getUser(Long userId);
    
User updateUser(User user);
    
User getUserByName(String name);
    
int 
insertUser(User user);
    
User  delete (Long userId);
}<br>
//实现类<br>//假设有需求是由name查询user的,一般我们是先由name->id,再由id->user,这样会减少redis缓存的冗余信息
@Service(value = "userSerivceImpl") @CacheConfig(cacheNames = "user") public class UserServiceImpl implements UserService {
private static Logger log = LoggerFactory.getLogger(UserServiceImpl.class); @Autowired UserMapper userMapper; @Cacheable(key = "caches[0].name + T(String).valueOf(#userId)",unless = "#result eq null") public User getUser(Long userId) {
User user = userMapper.selectByPrimaryKey(userId); return user; } @Cacheable(key = "caches[0].name + #name") public String getIdByName(String name){
Long userId = userMapper.getIdByName(name); return String.valueOf(userId); } //使用getUserByName方式调用getIdByName 和getUser方法来实现查询,但是如果用此方式在controller中直接调用
//getUserByName方法,缓存效果是不起作用的,必须是直接调用getIdByName和getUser方法才能起作用
public User getUserByName(String name) {
//通过name 查询到主键 再由主键查询实体 return getUser(Long.valueOf(getIdByName(name))); }

非注解方式实现

1.先定义一个RedisCacheConfig类用于生成RedisTemplate和对CacheManager的管理

@Configurationpublic class RedisCacheConfig  extends CachingConfigurerSupport {    /*定义缓存数据 key 生成策略的bean     *包名+类名+方法名+所有参数    */    @Bean    public KeyGenerator keyGenerator() {        return new KeyGenerator() {            @Override            public Object generate(Object target, Method method, Object... params) {                StringBuilder sb = new StringBuilder();                sb.append(target.getClass().getName());                sb.append(method.getName());                for (Object obj : params) {                    sb.append(obj.toString());                }                return sb.toString();            }        };    }     //@Bean     public CacheManager cacheManager(             @SuppressWarnings("rawtypes") RedisTemplate redisTemplate) {         //RedisCacheManager cacheManager = new RedisCacheManager(redisTemplate);           //cacheManager.setDefaultExpiration(60);//设置缓存保留时间(seconds)         return cacheManager;     }    //1.项目启动时此方法先被注册成bean被spring管理    @Bean    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {        StringRedisTemplate template = new StringRedisTemplate(factory);        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);        ObjectMapper om = new ObjectMapper();        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);        jackson2JsonRedisSerializer.setObjectMapper(om);        template.setValueSerializer(jackson2JsonRedisSerializer);        template.afterPropertiesSet();        return template;    }    @Bean    public RedisTemplate
redisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate
template = new RedisTemplate<>(); template.setConnectionFactory(connectionFactory); //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值 Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class); System.out.println("==============obj:"+Object.class.getName()); ObjectMapper mapper = new ObjectMapper(); mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(mapper); template.setValueSerializer(serializer); //使用StringRedisSerializer来序列化和反序列化redis的key值 template.setKeySerializer(new StringRedisSerializer()); template.afterPropertiesSet(); return template; }}

2.定义一个redisUtil类用于存取缓存值

@Componentpublic class RedisCacheUtil {    @Autowired    private StringRedisTemplate stringRedisTemplate;    @Autowired    private RedisTemplate
redisTemplate; /** * 存储字符串 * @param key string类型的key * @param value String类型的value */ public void set(String key, String value) { stringRedisTemplate.opsForValue().set(key, value); } /** * 存储对象 * @param key String类型的key * @param value Object类型的value */ public void set(String key, Object value) { redisTemplate.opsForValue().set(key, value); } /** * 存储对象 * @param key String类型的key * @param value Object类型的value */ public void set(String key, Object value,Long timeOut) { redisTemplate.opsForValue().set(key, value,timeOut, TimeUnit.SECONDS); } /** * 根据key获取字符串数据 * @param key * @return */ public String getValue(String key) { return stringRedisTemplate.opsForValue().get(key); }// public Object getValue(String key) {// return redisTemplate.opsForValue().get(key);// } /** * 根据key获取对象 * @param key * @return */ public Object getValueOfObject(String key) { return redisTemplate.opsForValue().get(key); } /** * 根据key删除缓存信息 * @param key */ public void delete(String key) { redisTemplate.delete(key); } /** * 查询key是否存在 * @param key * @return */ @SuppressWarnings("unchecked") public boolean exists(String key) { return redisTemplate.hasKey(key); }}

3.实现类

/** * Created by yexin on 2017/9/8. * * 在Impl基础上+ 防止缓存雪崩和缓存穿透功能 */@Service(value = "userServiceImpl4")public class UserServiceImpl4 implements UserService {    @Autowired    UserMapper userMapper;    @Autowired    RedisCacheUtil redisCacheUtil;    @Value("${timeOut}")    private long timeOut;    @Override    public User getUser(Long userId) {        String key = "user" + userId;        User user = (User) redisCacheUtil.getValueOfObject(key);        String keySign = key + "_sign";        String valueSign = redisCacheUtil.getValue(keySign);        if(user == null){//防止第一次查询时返回时空结果            //防止缓存穿透            if(redisCacheUtil.exists(key)){                return  null;            }            user = userMapper.selectByPrimaryKey(userId);            redisCacheUtil.set(key,user);            redisCacheUtil.set(keySign,"1",timeOut *(new Random().nextInt(10) + 1));//            redisCacheUtil.set(keySign,"1",0L);  //过期时间不能设置为0,必须比0大的数            return user;        }        if(valueSign != null){            return user;        }else {            //设置标记的实效时间            Long tt = timeOut * (new Random().nextInt(10) + 1);            System.out.println("tt:"+tt);            redisCacheUtil.set(keySign,"1",tt);            //异步处理缓存更新  应对与高并发的情况,会产生脏读的情况            ThreadPoolUtil.getExecutorService().execute(new Runnable(){                public void run() { //                    System.out.println("-----执行异步操作-----");                    User user1 = userMapper.selectByPrimaryKey(userId);                    redisCacheUtil.set(key,user1);                }            });//            new Thread(){//                public void run() { //应对与高并发的情况,会产生脏读的情况//                    System.out.println("-----执行异步操作-----");//                    User user1 = userMapper.selectByPrimaryKey(userId);//                    redisCacheUtil.set(key,user1);//                }//            }.start();        }        return user;    }}

异步实现

异步实现通过kafka作为消息队列实现,异步只针对更新操作,查询无需异步,实现类如下

1.pom文件需依赖

org.springframework.cloud
spring-cloud-starter-stream-kafka

2.生产着代码

@EnableBinding(Source.class)public class SendService {    @Autowired    private Source source;    public void sendMessage(String msg) {        try{            source.output().send(MessageBuilder.withPayload(msg).build());        } catch (Exception e) {            e.printStackTrace();        }    }//接受的是一个实体类,具体配置在application.yml    public void sendMessage(TransMsg msg) {        try {            //MessageBuilder.withPayload(msg).setHeader(KafkaHeaders.TOPIC,"111111").build();            source.output().send(MessageBuilder.withPayload(msg).build());        } catch (Exception e) {            e.printStackTrace();        }    }}

3.消费者代码

@EnableBinding(Sink.class)public class MsgSink {    @Resource(name = "userSerivceImpl3")    UserService userService;    @StreamListener(Sink.INPUT)    public void process(TransMsg
msg) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, ClassNotFoundException { System.out.println("sink......"+msg); System.out.println("opt db strat ----"); userService.updateUser((User) msg.getParams()); System.out.println("执行db结束------"); }}

4.application.yml配置

spring:  application:    name: demo-provider  redis:    database: 0    host: 192.168.252.128    #host: localhost    port: 6379    password:    pool:      max-active: 50      max-wait: -1      max-idle: 50    timeout: 0#kafka  cloud:      stream:        kafka:          binder:            brokers: 192.168.252.128:9092            zk-nodes: 192.168.252.128:2181            minPartitionCount: 1            autoCreateTopics: true            autoAddPartitions: true        bindings:          input:            destination: topic-02#            content-type: application/json            content-type: application/x-java-object   #此种类型配置在消费端接受到的为一个实体类            group: t1            consumer:              concurrency: 1              partitioned: false          output:            destination: topic-02            content-type: application/x-java-object                         producer:              partitionCount: 1        instance-count: 1        instance-index: 0

5.实现类

@Service(value = "userServiceImpl2")public class UserServiceImpl2  implements UserService{    @Autowired    UserMapper userMapper;    @Autowired    RedisCacheUtil redisCacheUtil;    private static Logger log = LoggerFactory.getLogger(UserServiceImpl.class);    @Autowired    SendService sendService;    public User updateUser(User user) {        System.out.println("   impl2   active   ");        String key = "user"+ user.getUserId();        System.out.println("key:"+key);        //是否存在key        if(!redisCacheUtil.exists(key)){         return userMapper.updateByPrimaryKeySelective(user) == 1 ? user : null;        }        /*  更新key对应的value            更新队列         */        User user1 = (User)redisCacheUtil.getValueOfObject(key);        try {            redisCacheUtil.set(key,user);            TransMsg
msg = new TransMsg
(key,user,this.getClass().getName(),"updateUser",user); sendService.sendMessage(msg); }catch (Exception e){ redisCacheUtil.set(key,user1); } return user; }}

注意:kafka与zookeeper的配置在此不介绍

canal实现方式

先要安装canal,配置canal的example文件等,配置暂不介绍

package org.example.canal;import com.alibaba.fastjson.JSONObject;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.common.utils.AddressUtils;import com.alibaba.otter.canal.protocol.Message;import com.alibaba.otter.canal.protocol.CanalEntry.Column;import com.alibaba.otter.canal.protocol.CanalEntry.Entry;import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;import com.alibaba.otter.canal.protocol.CanalEntry.EventType;import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;import com.alibaba.otter.canal.protocol.CanalEntry.RowData;import org.example.canal.util.RedisUtil;import java.net.InetSocketAddress;import java.util.List;public class CanalClient {    public static void main(String[] args) {        // 创建链接        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),                11111), "example", "", "");        int batchSize = 1000;        try {            connector.connect();            connector.subscribe(".*\\..*");            connector.rollback();            while (true) {                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据                long batchId = message.getId();                int size = message.getEntries().size();                if (batchId == -1 || size == 0) {                    try {                        Thread.sleep(1000);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                } else {                    printEntry(message.getEntries());                }                connector.ack(batchId); // 提交确认                // connector.rollback(batchId); // 处理失败, 回滚数据            }        } finally {            connector.disconnect();        }    }    private static void printEntry( List
entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { System.out.println("tablename:"+entry.getHeaderOrBuilder().getTableName()); rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { redisDelete(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { redisInsert(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); redisUpdate(rowData.getAfterColumnsList()); } } } } private static void printColumn( List
columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } private static void redisInsert( List
columns){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if(columns.size()>0){ RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString()); } } private static void redisUpdate( List
columns){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if(columns.size()>0){ RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString()); } } private static void redisDelete( List
columns){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if(columns.size()>0){ RedisUtil.delKey("user:"+ columns.get(0).getValue()); } }}
package org.example.canal.util;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.JedisPoolConfig;public class RedisUtil {    // Redis服务器IP    private static String ADDR = "192.168.252.128";    // Redis的端口号    private static int PORT = 6379;    // 访问密码    //private static String AUTH = "admin";    // 可用连接实例的最大数目,默认值为8;    // 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。    private static int MAX_ACTIVE = 1024;    // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。    private static int MAX_IDLE = 200;    // 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;    private static int MAX_WAIT = 10000;    // 过期时间    protected static int  expireTime = 60 * 60 *24;    // 连接池    protected static JedisPool pool;    static {        JedisPoolConfig config = new JedisPoolConfig();        //最大连接数        config.setMaxTotal(MAX_ACTIVE);        //最多空闲实例        config.setMaxIdle(MAX_IDLE);        //超时时间        config.setMaxWaitMillis(MAX_WAIT);        //        config.setTestOnBorrow(false);        pool = new JedisPool(config, ADDR, PORT, 1000);    }    /**     * 获取jedis实例     */    protected static synchronized Jedis getJedis() {        Jedis jedis = null;        try {            jedis = pool.getResource();        } catch (Exception e) {            e.printStackTrace();            if (jedis != null) {                pool.returnBrokenResource(jedis);            }        }        return jedis;    }    /**     * 释放jedis资源     * @param jedis     * @param isBroken     */    protected static void closeResource(Jedis jedis, boolean isBroken) {        try {            if (isBroken) {                pool.returnBrokenResource(jedis);            } else {                pool.returnResource(jedis);            }        } catch (Exception e) {        }    }    /**     * 是否存在key     * @param key     */    public static boolean existKey(String key) {        Jedis jedis = null;        boolean isBroken = false;        try {            jedis = getJedis();            jedis.select(0);            return jedis.exists(key);        } catch (Exception e) {            isBroken = true;        } finally {            closeResource(jedis, isBroken);        }        return false;    }    /**     * 删除key     * @param key     */    public static void delKey(String key) {        Jedis jedis = null;        boolean isBroken = false;        try {            jedis = getJedis();            jedis.select(0);            jedis.del(key);        } catch (Exception e) {            isBroken = true;        } finally {            closeResource(jedis, isBroken);        }    }    /**     * 取得key的值     * @param key     */    public static String stringGet(String key) {        Jedis jedis = null;        boolean isBroken = false;        String lastVal = null;        try {            jedis = getJedis();            jedis.select(0);            lastVal = jedis.get(key);            jedis.expire(key, expireTime);        } catch (Exception e) {            isBroken = true;        } finally {            closeResource(jedis, isBroken);        }        return lastVal;    }    /**     * 添加string数据     * @param key     * @param value     */    public static String stringSet(String key, String value) {        Jedis jedis = null;        boolean isBroken = false;        String lastVal = null;        try {            jedis = getJedis();            jedis.select(0);            lastVal = jedis.set(key, value);            jedis.expire(key, expireTime);        } catch (Exception e) {            e.printStackTrace();            isBroken = true;        } finally {            closeResource(jedis, isBroken);        }        return lastVal;    }    /**     *  添加hash数据     * @param key     * @param field     * @param value     */    public static void hashSet(String key, String field, String value) {        boolean isBroken = false;        Jedis jedis = null;        try {            jedis = getJedis();            if (jedis != null) {                jedis.select(0);                jedis.hset(key, field, value);                jedis.expire(key, expireTime);            }        } catch (Exception e) {            isBroken = true;        } finally {            closeResource(jedis, isBroken);        }    }}

附redis关于缓存雪崩和缓存穿透,热点key

穿透

穿透:频繁查询一个不存在的数据,由于缓存不命中,每次都要查询持久层。从而失去缓存的意义。

解决办法: 持久层查询不到就缓存空结果,查询时先判断缓存中是否exists(key) ,如果有直接返回空,没有则查询后返回,

                  注意insert时需清除查询的key,否则即便DB中有值也查询不到(当然也可以设置空缓存的过期时间)

雪崩

雪崩:缓存大量失效的时候,引发大量查询数据库。

解决办法:①用锁/分布式锁或者队列串行访问

                  ②缓存失效时间均匀分布

热点key

热点key:某个key访问非常频繁,当key失效的时候有打量线程来构建缓存,导致负载增加,系统崩溃。

解决办法:

①使用锁,单机用synchronized,lock等,分布式用分布式锁。

②缓存过期时间不设置,而是设置在key对应的value里。如果检测到存的时间超过过期时间则异步更新缓存。

③在value设置一个比过期时间t0小的过期时间值t1,当t1过期的时候,延长t1并做更新缓存操作。

4设置标签缓存,标签缓存设置过期时间,标签缓存过期后,需异步地更新实际缓存  具体参照userServiceImpl4的处理方式

 

总结

一、查询redis缓存时,一般查询如果以非id方式查询,建议先由条件查询到id,再由id查询pojo

二、异步kafka在消费端接受信息后,该怎么识别处理那张表,调用哪个方法,此问题暂时还没解决

三、比较简单的redis缓存,推荐使用canal

参考文档

http://blog.csdn.net/fly_time2012/article/details/50751316

http://blog.csdn.net/kkgbn/article/details/60576477

http://www.cnblogs.com/fidelQuan/p/4543387.html

你可能感兴趣的文章
Frank Klemm's Dither and Noise Shaping Page: Dither and Noise Shaping In MPC/MP+
查看>>
网络抓包的部署和工具Wireshark【图书节选】
查看>>
Redis在Windows+linux平台下的安装配置
查看>>
Maven入门实战笔记-11节[6]
查看>>
几篇JavaEye的博客
查看>>
Local declaration of 'content' hides instance variable
查看>>
Android学习之路十四:TabHost
查看>>
[zz] C++智能指针循环引用解决
查看>>
ASP.NET中 HTML标签总结及使用
查看>>
同方国芯释疑问询 购台企不以定增为前提
查看>>
WCF的WindowsService开发参考【附源码】
查看>>
Spring 项目中把 SQL 语句写在 .sql 文件中
查看>>
Linux下日志系统的设计
查看>>
我的RabbitMQ的学习成果
查看>>
小白都能看懂的Linux系统下安装配置Zabbix
查看>>
Async/Await是这样简化JavaScript代码的
查看>>
【高速接口-RapidIO】6、RapidIO核仿真与包时序分析
查看>>
微软开放6万项专利技术,叫停Linux专利战
查看>>
写给正在入坑linux系统的伙伴
查看>>
MySQL主从同步报错排错结果及修复过程之:Slave_SQL_Running: No
查看>>