Artfairy's blog

Hello,world.


  • 首页

  • 简历

  • 标签

  • 分类

  • 归档

  • 日程表

公告

发表于 2022-04-20

新增加了友链,就在右侧导航栏,如果想要加友链的话,请到Github@我


多线程与并发编程学习笔记02

发表于 2022-04-13

tags:多线程

多线程与并发编程学习笔记02

13. 阻塞队列 (多线程并发和线程池的时候经常使用)

BlockQueue接口:指定队列大小,若队列满则阻塞等待;取元素时若为空则阻塞等待

Queue接口的类图

重要的实现类:

-> ArrayBlockingQueue

-> LinkedBlockingQueue

-> SynchronousQueue【重点】

4组API:

方法/API 抛出异常 返回值 阻塞等待 超时等待
添加元素 add() offer() put() offer(”a”,2,秒)
移除元素 remove() poll() take() poll(2,秒)
查看队列首 element() peek() - -
  • SynchronousQueue 同步队列:【进一个取一个】

    SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。

    拥有公平(FIFO)(TransferQueue)和非公平(LIFO)(TransferStack)策略,非公平策略会导致一些数据永远无法被消费的情况.

14.线程池【重要】【3大方法/7大参数/4种拒绝】

14.1 池化技术:线程池、数据库连接池、内存池、常量池…

线程池: 线程池就是首先创建一些线程,它们的集合称为线程池。使用线程池可以很好地提高性能,线程池在系统启动时即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。

  • 工作机制:
    • 在线程池的编程模式下,任务提交给整个线程池,而不是直接提交给某个线程,线程池在拿到任务后,就在内部寻找是否有空闲的线程,如果有,则将任务交给某个空闲的线程。
    • 一个线程同时只能执行一个任务,但可以同时向一个线程池提交多个任务。
  • 优点:
    • 避免线程的频繁创建、销毁;提高效率和访问速度。
    • 实现线程复用、线程管理、控制最大并发数。

14.2 创建线程池的方法:

  1. 使用Executors工具类的3种构造方法:
1
ExecutorService threadPool = Executors.newsingleThreadExecutor();  //单个线程的线程池
1
2
ExecutorService threadPoo1 = Executors.newFixedThreadpool(5); 
//创建一个固定的线程池的大小
1
2
ExecutorService threadPool = Executors.newCachedThreadPool(); 
//可伸缩的,遇强则强,遇弱则弱
  1. 使用ThreadPoolExecutor构造方法:
1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize, //核心线程池基本大小
int maximumPoolSzie,//最大线程池大小
long keepAliveTime,//非核心线程的超时时间
TimeUnit unit,//超时的单位
BlockingQueue<Runnable> workQueue,//阻塞队列
ThreadFactory threadFactory,//线程工厂:用于创建线程
RejectedExecutionHandler handler//拒绝策略
)

两种方法本质:都是调用了ThreadPoolExecutor (7个参数)【因此:线程池要使用此方法去创建,而不是用Executors下的方法】

14.3 线程池调用流程:

1
2
使用构造方法后,返回一个线程池threadPool;
threadPool.execute();//使用线程池提交任务:

  1. 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
  2. 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将该任务添加到任务缓存队列中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
  3. 如果当前线程池中的线程数达到maximumPoolSize,则会采取任务拒绝策略进行处理;
  4. 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

14.4 为什么使用阻塞队列?

阻塞队列主要是用于生产者-消费者模型的情况: 比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素。当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒)。

​ 这样提供了极大的方便性。 如果使用非阻塞队列,它不会对当前线程产生阻塞,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦。

拒绝策略:4种:

ThreadPoolExecutor.AbortPolicy 丢弃任务,并抛出RejectedExecutionException异常【默认】
ThreadPoolExecutor.AbortPolicy 丢弃任务,但是不抛出异常
ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最老的任务,然后重新提交被拒绝的任务
ThreadPoolExecutor.CallerRunsPolicy 由调用线程(提交任务的线程)处理该任务

14.5 最大线程数如何选择?

CPU密集型任务:maximumPoolSize = CPU核心数量;保持CPU效率最高

IO 密集型任务 :maximumPoolSize = CPU核心数量 * 2;

15. 函数式接口【重要】:

新时代的程序员要掌握的编程技术: lambda表达式、链式编程、 函数式接口、Stream流式计算

函数式接口:只有一个方法的接口 @FunctionalInterface

15.1 4种函数式接口:

  • Function【函数型】:一个输入,输出一个返回值;

    1
    Function<String, String> function = str -> {return str;}; //lambda表达式简化
  • Predicate【判断型】: 一个输入,输出一个boolean的返回值:

    1
    2
    3
    4
    public static void main(String[] args) {
    Predicate<String> predicate = String::isEmpty;//判断字符串是否为空的函数
    System.out.println(predicate.test("111"));//输出false;
    }
  • Consumer【消费型】:一个输入,没返回值:

    1
    2
    3
    4
    public static void main(String[] args) {
    Consumer<String> consumer = System.out::println;//只是打印,没有返回值
    consumer.accept("12345");//打印了”12345“
    }
  • Supplier 【供给型】:没有输入,只有返回值:

    1
    2
    3
    4
    public static void main(String[] args) {
    Supplier<Integer> supplier = () -> 1024;//没有参数
    System.out.println(supplier.get());//提供1024
    }

15.2 Stream 流式计算:

16. ForkJoin

Since : JDK1.7+

原理:

  1. 分治
  1. 工作窃取:工作窃取算法是指某个线程从其他队列里窃取任务来执行。

​ - 工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争。

- 工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如**创建多个线程和多个双端队列**。

ForkJoin类似于分治思想,只是分成小任务后使用的是多个线程并行执行小任务,从而加速计算。

但是还是Stream并行流式计算,速度最快:

1
long sum = LongStream.rangeClosed(0L, 10_ 0000_ _00L).parallel().reduce( identity:0,Long::sum);

只需1行代码,运算速度快100倍!

17. 异步回调

  • 是什么?在发起一个异步任务的同时指定一个函数,在异步任务完成时会自动的调用这个函数。

  • 为什么?需要获取异步任务的执行结果,但是又不应该让其阻塞(降低效率),即想要高效的获取任务的执行结果。

  • 怎么实现?使用Future接口下的CompletableFuture类;它的方法有:runAsync();(无返回值) supplyAsync();有返回值;

18. JMM

  • JMM带来的问题:

    1) 可见性问题:

    ​ CPU中运行的线程从主存中拷贝共享对象obj到它的CPU缓存,把对象obj的count变量改为2。但这个变更对运行在右边CPU中的线程不可见,因为这个更改还没有flush到主存中。

    ​ 要解决共享对象可见性这个问题,我们可以使用 volatile关键字或者是加锁:

​ 2)竞争问题:

​ 线程A和线程B共享一个对象obj。假设线程A从主存读取Obj.count变量到自己的CPU缓存,同时,线程B也读取了Obj.count变量到它的CPU缓存,并且这两个线程都对Obj.count做了加1操作。此时,Obj.count加1操作被执行了两次,不过都在不同的CPU缓存中。如果这两个加1操作是串行执行的,那么Obj.count变量便会在原始值上加2,最终主存中的Obj.count的值会是3。然而下图中两个加1操作是并行的,不管是线程A还是线程B先flush计算结果到主存,最终主存中的Obj.count只会增加1次变成2,尽管一共有两次加1操作。 要解决上面的问题我们可以使用 synchronized代码块。

  • JMM中的指令重排:

    • 在执行程序时,为了提高性能,编译器和处理器常常会对指令做重排序,指令重排包括:

      • 编译器优化
      • CPU的指令级并行
      • 内存系统重排序
    • as-if-serial关系 : 不管怎么重排序(编译器和处理器为了提高并行度,程序的执行结果不能被改变。编译器、runtime和处理器都必须遵守as-if-serial语义。为了遵守as-if-serial语义,编译器和处理器不会对存在数据依赖关系的操作做重排序,因为这种重排序会改变执行结果。【仅限于单线程】

      多线程下JMM的指令重排带来的问题:在多线程中:对存在控制依赖的操作重排序,可能会改变程序的执行结果.

    • happens-before关系:保证正确同步的多线程程序的执行结果不被改变,满足可见性的程序都满足此关系。

  • volitale关键字可解决:

    • 可见性:【原理是MESI】

      • 当写一个volatile变量时:JMM会把该线程对应的本地内存中的共享变量值立即刷新到主内存。
      • 当读一个volatile变量时:JMM会把该线程对应的本地内存变量置为无效。线程接下来将从主内存中读取共享变量。
    • 有序性:【原理是内存屏障】:

      • volatile写操作的前面插入一个StoreStore屏障,后面插入一个StoreLoad屏障,禁止前面指令和后面指令交换顺序。
      • volatile读操作的后面插入一个LoadLoad屏障, 后面插入一个LoadStore屏障,禁止前面指令和后面指令交换顺序。
    • 不保证原子性:

      对任意单个volatile变量的读/写具有原子性,但类似于i++这种复合操作不具有原子性。

      那么如何保证原子性呢? -> lock、synchronized 【效率低】

      ​ -> 使用原子类,例如AtomicInteger等;【底层是CAS】

      1
      2
      private volatile static AtomicInteger num = new AtomicInteger();
      num.getAndIncrement(); // AtomicInteger + 1 ; 此方法原理是使用了 CAS

      原子类的底层都直接和操作系统挂钩! 在内存中修改值! 其使用到的Unsafe类是一个很特殊的存在 !

19. 单例模式

1. 饿汉式:【永远提前加载,耗费系统资源】

1
2
3
4
5
6
7
8
9
10
public class Singleton {
private Singleton(){
}

private static final Singleton SINGLETON = new Singleton();

public static Singleton getSingleton() {
return SINGLETON;
}
}
  1. 饿汉式:【双重检测+锁+Volatile】重要
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class SingletonLazy {
//懒汉式
private SingletonLazy(){
}

private static volatile SingletonLazy lazy;

public static SingletonLazy getLazy() {
if(lazy == null){
synchronized (SingletonLazy.class){
if(lazy == null){
lazy = new SingletonLazy(); //这不是一个原子性操作!
}
}
}
return lazy;
}
}
  • 为何要双重检测?因为除第一次调用外,大部分情况是单例已经创建,因此在外层加锁会导致每次调用此方法都会加锁,影响性能。所以在内层加锁,并且进入内层之后需要再次进行判断单例是否已经存在,若仍不存在,此时才可放心创建。

  • 为何声明单例时要加volatile?因为lazy = new SingletonLazy();不是一个原子性操作,其步骤是:

    1. 分配内存空间
    2. 执行构造方法 初始化对象
    3. 把对象指向这个内存空间

    由于指令重排的存在,可能使上述指令变为1->3->2的顺序,这样就会导致其他线程获取一个null的单例对象;

  1. 静态内部类实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class Holder {
    private Holder(){
    }

    private static class InnerClass{
    private static final Holder HOLDER = new Holder();
    }

    public static Holder getInstance(){
    return InnerClass.HOLDER;
    }
    }
  1. 安全问题:无论是使用以上哪种方式,只要是以private修饰构造器用来限制外部调用的,都可以用反射机制破解!

    • 首先用getDeclaredConstructor()通过反射获取构造器

    • 然后用setAccessible(true)即可无视构造器的private修饰符

1
2
3
4
5
6
7
public static void main(String[] args) throws Exception{
SingletonLazy instance1 = SingletonLazy.getLazy();
Constructor<SingletonLazy> constructor = SingletonLazy.class.getDeclaredConstructor(null);//通过反射获取构造器
constructor.setAccessible(true);//通过反射 无视构造器的private修饰符!!
SingletonLazy instance2 = constructor.newInstance();
System.out.println(instance1 == instance2);
}

那么,单例类如何防止反射?

在其构造器内加入flag秘钥校验,使其在第二次被调用时抛出异常:

1
2
3
4
5
6
7
8
9
10
private static boolean flag = false;  
private Singleton(){
synchronized(Singleton.class){
if(flag == false){
flag = !flag;
} else {
throw new RuntimeException("单例模式被侵犯!");
}
}
}

但是,若flag秘钥被泄露,同样可以通过反射将flag设置为相应的值,从而破解单例模式。

  1. 使用 枚举类enum 【现如今最推荐的单例模式方法】
1
2
3
4
5
6
7
//enum默认就是单例的,又简单又牛逼,不存在安全问题!
public enum EnumSingle {
INSTANCE;
public EnumSingle getInstance(){
return INSTANCE;
}
}
  • 修复安全问题:

    • 当我们尝试像往常一样使反射破解枚举类时:

      出现异常:Cannot reflectively create enum objects【不能通过反射创建枚举对象】

      这是因为反射包的newInstance()方法中添加了以上的异常,禁止了通过反射创建枚举对象。

1
2
3
4
5
6
7
8
public static void main(String[] args) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
EnumSingle instance1 = EnumSingle.INSTANCE;
Constructor<EnumSingle> constructor = EnumSingle.class.getDeclaredConstructor(String.class,int.class);//注意:枚举类只有有参构造器!
//尝试使反射破解枚举类时,出现异常:Cannot reflectively create enum objects!
constructor.setAccessible(true);
EnumSingle instance2 = constructor.newInstance();
System.out.println(instance1 == instance2);
}
  • 序列化问题

    当使用反序列化时,也会破坏单例模式,导致反序列化时生成的实例和之前的实例并不是同一个!

    而由于枚举类enum优先于反序列化方法readResolve(),因此不会被反序列化破坏单例。

20. CAS 「Compare And Swap」乐观锁

​ Atomic原子类下的方法都是compareAnd..(expect,newvalue);例如:

1
2
3
AtomicInteger atomicInteger = new AtomicInteger(2022);
//如果当前值是我的期望值,那么就更新,否则就不更新
atomicInteger.compareAndSet(2022,2021);
  • CAS是一条CPU并发原语:它的功能是判断内存某个位置的值是否为预期值,如果是则更新为新的值,这个过程是原子的。CAS并发原语提现在Java语言中就是sun.misc.UnSafe类中的各个方法。调用UnSafe类中的CAS方法,JVM会帮我实现CAS汇编指令.这种操作直接操作内存,通过它实现了原子操作。

  • CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B):

    判断过程:“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。”

  • 自旋锁:CAS的方法使用的是自旋锁,因为Unsafe类下的方法底层代码使用的是do..while()循环;

  • 缺点:

    • 自旋锁循环比较耗时。解决办法 -> 【pause指令】
    • 一次只能保证一个共享变量的原子性。解决办法 -> AtomicReference【原子引用】
    • ABA问题:当你获得对象当前数据后,在准备修改为新值前,对象的值被其他线程连续修改了两次,而经过两次修改后,对象的值又恢复为旧值,这样当前线程无法正确判断这个对象是否修改过。解决办法 ->【时间戳原子引用】

21. 原子引用:【可以加时间戳】

  • AtomicReference 类:可将多个变量打包成一个引用类型的对象实现CAS;

  • AtomicStampedReference 类的CAS方法比原子引用类的CAS方法多了两个变量(期望时间戳,新的时间戳)

1
2
3
AtomicStampedReference<Integer> a = new AtomicStampedReference<>(2022,0);
int stamp = a.getStamp(); //获得当前时间戳
a.compareAndSet(2022,2021,stamp,stamp+1);//「带时间戳的原子操作」

​ 当AtomicStampedReference对应的数值被修改时,除了更新数据本身外,还必须要更新时间戳,对象值和时间戳都必须满足期望值,写入才会成功!【解决了ABA问题】

21. 可重入锁:(递归锁)

什么是可重入锁:

广义上的可重入锁指的是可递归调用的锁。获取了外层的锁之后,在内层仍然可以获得该锁,并且不发生死锁(前提得是同一个对象或者class),这样的锁就叫做可重入锁。

synchronized锁的Demo:

1
2
3
4
5
6
7
8
9
10
11
//当外层方法sms()获取到对象的锁后,内层方法text()依然可以使用该锁;
class Phone {
public synchronized void sms() {
text();
System.out.println("sms");
}

public synchronized void text() {
System.out.println("打字");
}
}

Lock()锁的Demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Phone2 {
Lock lock = new ReentrantLock();
public void sms() {
lock.lock();//加锁
try {
text();//「这里面也有个加锁解锁的过程」;
System.out.println("sms");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();//解锁
}
}

public void text() {
lock.lock();//加锁
try {
System.out.println("打字");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();//解锁
}
}

Synchronized、Reentrantlock 都是可重入锁;

注:当递归调用加锁方法时:Synchronized锁了一次,而Reentrantlock锁了两次;

22. 自旋锁

通过原子引用的CAS方法实现自旋:可以自己写一个自旋锁的Demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class MySpinLock {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
// 加锁
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"get myLock()");
// 通过while实现自旋锁
while (!atomicReference.compareAndSet(null,thread)){
}
}
// 解锁
public void myUnlock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"get myUnlock()");
atomicReference.compareAndSet(thread,null));
}
}

23. 死锁

​ 如下图所示,线程A持有资源2,线程B持有资源1,他们同时都想申请对方的资源,所 以这两个线程就会互相等待而进入 死锁状态。

  • 死锁4条件:

    • 互斥条件 **: 该资源任意一个时刻只由一个线程占用**。
    • 不可剥夺条件 **: 线程已获得的资源在末使用完之前不能被其他线程强行剥夺**,只有自己使用完毕后才释放资源。
    • 请求与保持条件 **: 一个进程因请求资源而阻塞时,对已获得的资源保持不放**。
    • 循环等待条件 : 若干进程之间形成一种头尾相接的循环等待资源关系。
  • 如何排查死锁?

    1. 使用 jsp-l 列出java中的进程:

    2. 使用 jstack 进程号 查看进程信息:

    可以看到找到一个deadlock(死锁);并且可以看到每个线程已获取的锁以及他们想要等待获取的锁;

JVM调优

发表于 2022-04-13

tags:JVM

JVM调优

  1. 如何避免频繁fullGC?

  • 出现fullgc的原因:

    1.年轻代空间不足

    2.per Gen(永久代)空间满

    3.CMS GC时出现promotion failed和concurrent mode failure

    4.统计得到的Minor GC晋升到旧生代的平均大小大于旧生代的剩余空间等

  • 解决:

    • 增大survive区大小
    • 降低大对象的生命周期
    • 避免产生过多可以避免的大对象
    • 增大永久代空间

多线程与并发编程学习笔记

发表于 2022-04-13

tags:多线程

多线程与并发编程学习笔记01

1. 什么是JUC:java.util.concurrent 并发工具包;

2. 线程与进程:

进程:一段代码程序执行的过程;一个进程可包含多个线程;

​ 在 Java 本身启动时会创建若干个线程,除了 main 线程之外,还有Monitor Ctrl-Break,Signal Dispatcher,Finalizer,Reference Handler。

线程:

​ Java真的能开启多个线程吗?否。是使用的native方法start0()开启的线程。因为JAVA运行在虚拟机上,无法直接操作硬件。

线程6大状态:(可在Thread类的源码中找到)

NEW -> RUNNABLE -> BLOCKED ->WAITING -> TIME_WAITING -> TERMINATED

3. 并发与并行:

并发:多线程操作同一个资源;多线程是模拟出来的,是CPU时间片轮转快速交替执行而已。并发编程是为了:充分利用CPU性能资源.

并行:CPU多个物理核心,多个线程可以同时执行;可使用线程池:

线程池的数量:

IO密集型=2Ncpu(可以测试后自己控制大小,2Ncpu一般没问题)(常出现于线程中:数据库数据交互、文件上传下载、网络数据传输等等)

计算密集型=Ncpu(常出现于线程中:复杂算法)

4. wait/sleep的区别:

方法 wait sleep
所属的类 Object Thread
是否释放锁 释放了锁 不会释放锁
使用范围 必须在同步代码块中使用 可以在任何地方进入sleep
是否捕获异常 不需要捕获 必须捕获异常

5. 什么是锁,锁的是谁:

8锁问题:

  • 场景1:两个 synchronized 实例方法:按拿锁顺序执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    /**
    * 标准情况下 是先sendEmail()还是先callPhone()?
    * 答案:sendEmail
    * 解释:被 synchronized 修饰的方式,锁的对象是方法的调用者(phone1)
    * 所以说这里两个方法想要获取的是同一个锁,谁先拿到锁谁先执行!这里是顺序拿锁
    */
    public class LockDemo1 {
    public static void main(String[] args) throws InterruptedException {
    Phone1 phone1 = new Phone1();
    new Thread(()->{
    phone1.sendEmail();
    },"A").start();
    TimeUnit.SECONDS.sleep(3);
    new Thread(()->{
    phone1.callPhone();
    },"B").start();
    }
    }
    class Phone1{
    public synchronized void sendEmail(){
    System.out.println("senEmail");
    }
    public synchronized void callPhone(){
    System.out.println("callPhone");
    }
    }
  • 场景2:两个 synchronized 实例方法,并在其中一种方法中添加sleep休眠:依然按拿锁顺序执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    /**
    * sendEmail()方法执行时会休眠三秒。 是先执行sendEmail() 还是 callPhone()?
    * 答案: sendEmail
    * 解释:被 synchronized 修饰的方式,锁的对象是方法的调用者(phone2)
    * 所以说这里两个方法想要获取的是同一个锁,谁先拿到锁谁先执行!(也就是说,和方法本身执行的耗时无关)顺序拿锁
    */
    public class LockDemo2 {
    public static void main(String[] args) throws InterruptedException {
    Phone2 phone2 = new Phone2();
    new Thread(()->{
    try {
    phone2.sendEmail();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    },"A").start();
    TimeUnit.SECONDS.sleep(2);
    new Thread(()->{
    phone2.callPhone();
    },"B").start();
    }
    }
    class Phone2{
    public synchronized void sendEmail() throws InterruptedException {
    TimeUnit.SECONDS.sleep(3);//该方法会睡3秒
    System.out.println("sendEmail");
    }
    public synchronized void callPhone(){
    System.out.println("callPhone");
    }
    }
  • 场景3:一个synchronized实例方法,一个普通实例方法:按时间顺序执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    /**
    * 被synchronized 修饰的方法和普通方法 先执行加锁方法sendEmail() 还是 普通方法callPhone()?
    * 答案: callPhone
    * 解释:新增加的这个方法没有 synchronized 修饰,不是同步方法,不受锁的影响,所以按时间顺序执行;
    */
    public class LockDemo3 {
    public static void main(String[] args) throws InterruptedException {
    Phone3 phone3 = new Phone3();
    new Thread(()->{
    try {
    phone3.sendEmail();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    },"A").start();

    TimeUnit.SECONDS.sleep(2);
    new Thread(()->{
    phone3.callPhone();
    },"B").start();
    }
    }
    class Phone3{
    public synchronized void sendEmail() throws InterruptedException {
    TimeUnit.SECONDS.sleep(4);
    System.out.println("sendEmail");
    }

    // 没有synchronized修饰的普通方法:先于加锁方法执行
    public void callPhone(){
    System.out.println("callPhone");
    }
    }
  • 场景4:两个 synchronized 实例方法对应两个调用者实例:按时间顺序执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    /**
    * 被synchronized 修饰的不同方法 先执行sendEmail() 还是callPhone()?
    * 答案:callPhone
    * 解释:被synchronized 修饰的不同方法 锁的对象是调用者
    * 这里锁的是两个不同的调用者,所有互不影响,按时间顺序执行;
    */
    public class LockDemo4 {
    public static void main(String[] args) throws InterruptedException {
    Phone4 phoneA = new Phone4();
    Phone4 phoneB = new Phone4();

    new Thread(()->{
    try {
    phoneA.sendEmail();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    },"A").start();

    TimeUnit.SECONDS.sleep(1);
    new Thread(()->{
    phoneB.callPhone();
    },"B").start();
    }
    }
    class Phone4{
    public synchronized void sendEmail() throws InterruptedException {
    TimeUnit.SECONDS.sleep(3);//睡3秒
    System.out.println("sendEmail");
    }
    public synchronized void callPhone(){
    System.out.println("callPhone");
    }
    }
  • 场景5:两个 static synchronized 静态方法:按拿锁顺序执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    /**
    * 两个静态同步方法 都被synchronized 修饰 是先sendEmail() 还是callPhone()?
    * 答案:sendEmial
    * 解释:只要方法被 static 修饰,锁的对象就是 Class模板对象,这个则全局唯一!
    * 这里是同一个锁,先获取到锁的先执行;
    */
    public class LockDemo5 {
    public static void main(String[] args) throws InterruptedException {
    Phone5 phone5 = new Phone5();
    new Thread(()->{
    try {
    phone5.sendEmail();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    },"A").start();

    TimeUnit.SECONDS.sleep(1);
    new Thread(()->{
    phone5.callPhone();
    },"B").start();
    }
    }
    class Phone5{
    public static synchronized void sendEmail() throws InterruptedException {
    TimeUnit.SECONDS.sleep(3);
    System.out.println("sendEmail");
    }

    public static synchronized void callPhone(){
    System.out.println("callPhone");
    }
    }
  • 场景6:两个 static synchronized 静态方法,使用两个实例调用:按拿锁顺序执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    /**
    * 同被static+synchronized 修饰的两个方法,是先sendEmail()还是callPhone()?
    * 答案:sendEmail
    * 解释:虽然有两个对象,但锁的对象是 Class模板,这个全局唯一,所以按方法获取到Class模板锁的顺序执行
    */
    public class test3 {
    public static void main(String[] args) throws InterruptedException {
    Phone5 phone5 = new Phone5();
    Phone5 phone5s = new Phone5();
    new Thread(() -> {
    phone5.sendEmail();
    }, "A").start();

    TimeUnit.SECONDS.sleep(1);

    new Thread(() -> {
    phone5s.callPhone();
    }, "B").start();
    }
    }

    class Phone5 {
    public static synchronized void sendEmail() {
    try {
    TimeUnit.SECONDS.sleep(3);//sleep三秒
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("sendEmail");
    }

    public static synchronized void callPhone() {
    System.out.println("callPhone");
    }
    }
  • 场景7:一个 synchronized 实例方法和一个 static synchronized 静态方法:按时间顺序执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    /**
    * 被synchronized 修饰的普通方法和静态方法 是先sendEmail() 还是 callPhone()?
    * 答案:callPhone
    * 解释:只要被static synchronized 修饰锁的是class模板, 而 synchronized 锁的是调用的对象
    * 这里是两个锁互不影响,按时间先后执行
    */
    public class LockDemo6 {
    public static void main(String[] args) throws InterruptedException {
    Phone6 phone6 = new Phone6();
    new Thread(()->{
    try {
    phone6.sendEmail();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    },"A").start();

    TimeUnit.SECONDS.sleep(1);
    new Thread(()->{
    phone6.callPhone();
    },"B").start();
    }
    }
    class Phone6{
    public static synchronized void sendEmail() throws InterruptedException {
    TimeUnit.SECONDS.sleep(3);//睡了3秒
    System.out.println("sendEmail");
    }

    public synchronized void callPhone(){
    System.out.println("callPhone");
    }
    }
  • 场景8:一个 synchronized 实例方法和一个 static synchronized 静态方法,使用两个实例调用:按时间顺序执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    /**
    * 一个被static+synchronized 修饰的方法和普通的synchronized方法,先执行sendEmail()还是callPhone()?
    * 答案:callPhone()
    * 解释: 只要被 static+synchronized 修饰的锁的就是整个class模板
    * 这里一个锁的是class模板 一个锁的是调用者(对象) ,所以互不影响,按时间顺序执行
    */
    public class LockDemo8 {
    public static void main(String[] args) throws InterruptedException {
    Phone8 phoneA = new Phone8();
    Phone8 phoneB = new Phone8();

    new Thread(()->{
    try {
    phoneA.sendEmail();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    },"A").start();

    TimeUnit.SECONDS.sleep(1);
    new Thread(()->{
    phoneB.callPhone();
    },"B").start();
    }
    }
    class Phone8{
    public static synchronized void sendEmail() throws InterruptedException {
    TimeUnit.SECONDS.sleep(3);
    System.out.println("sendEmail");
    }

    public synchronized void callPhone(){
    System.out.println("callPhone");
    }
    }

总结:

1、对普通方法(实例方法)加 synchronized 锁时,锁的是调用该方法的实例
2、对 static 修饰的方法(静态方法)加 synchronized 锁时,锁的是唯一的一个Class模板!
在我们编写多线程程序的时候,只需要搞明白这个到底锁的是什么就不会出错了!

6. Lock接口:方法有lock(),unlock(); lock.trylock(); 实现类有:

  • RenntrantLock(常用):可重入锁;

  • ReentrantReadWriteLock . ReadLock 读锁;

  • ReentrantReadWriteLock . WriteLock 写锁;

6.1 RenntrantLock可实现公平锁与非公平锁:默认非公平;

​ 使用方法:(手动加锁解锁)

1
2
3
4
1. Lock mylock = new ReentrantLock();
2. mylock.lock();
1. try..catch..finally;//业务内容
2. mylock.unlock();

7. Synchronized 和 RenntrantLock锁的区别:

Synchronized RenntrantLock等
类型 内置java关键字 类
是否手动 自动释放 手动释放,否则死锁
锁的状态 无法判断获取锁的状态 可判断是否获取到锁
等待状态 被动等待 可尝试获取锁:trylock()
功能特征 可重入锁,不可中断,非公平 可重入锁,可中断锁,可公平
适用范围 锁少量同步代码 锁大量同步代码

8. 生产者和消费者问题:

线程间的通信过程:判断是否等待 -> 执行业务 -> 通知唤醒其他线程

使用传统的synchronized锁实现:

1
2
3
4
5
6
7
8
9
class Data{
private int number = 0; //+1
public synchronized void increment() throws InterruptedException{
while (number!=0){
this.wait();//等待
}
number++; //执行业务
this .notifyA1l(); //通知其他线程,我+1完毕了
}

注意:wait() 应该总是出现在while循环中,若使用if判断,则可能导致虚假唤醒!Example:比如说买货,如果商品本来没有货物,突然进了一件商品,这是所有的线程都被唤醒了 ,但是只能一个人买,所以其他人都是假唤醒,因为获取不到对象的锁。

注:notify() 和 notifyAll()的区别:

​ notify()方法只唤醒一个等待(对象的)线程并使该线程开始执行。所以如果有多个线程等待一个对象,这个方法只会唤醒其中一个线程,选择哪个线程取决于操作系统对多线程管理的实现。notifyAll() 会唤醒所有等待(对象的)线程,尽管哪一个线程将会第一个处理取决于操作系统的实现。

使用JUC实现生产者消费者问题:

  1. lock.newCondition();
  2. lock.lock();
  3. 使用 condition 的await() ,signal()/signalAll()方法;//根据业务唤醒
  4. lock.unlock();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Data2{
private int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void increment() throws InterruptedException {
lock.lock();
try {
while (number!=0){
condition.await(); // 根据条件进入等待
}
number++; // 业务代码
condition.signalAll();// 通知其他线程,我+1完毕了
} catch (Exception e){
e.printStackTrace();
} finally { lock.unlock();
}
}

与传统锁方法的对比:

使用condition监视器的优势:精准地通知和唤醒线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class Data3 { // 资源类 Lock
private Lock lock = new ReentrantLock();
private Condition conditionA = lock.newCondition();
private Condition conditionB = lock.newCondition();
private Condition conditionC = lock.newCondition();
private int number = 1; // 1A 2B 3C

public void printA() {
lock.lock();
try {
while (number != 1) {
conditionA.await();// 等待
}
number = 2;
conditionB.signal();//指定唤醒conditonB监听的线程:B
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void printB() {
lock.lock();
try {
while (number != 2) {
conditionB.await();
}
number = 3;
conditionC.signal();//指定唤醒conditonC监听的线程:C
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void printC() {
lock.lock();
try {
while (number != 3) {
conditionC.await();
}
number = 1;
conditionA.signal();//指定唤醒conditonA监听的线程:A
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

9. ReadWriteLock接口 (读写锁):

​ 方法:readLock(); writeLock(); //可以实现:可单个线程同时写;多个线程同时读(没有线程在写的时候),

​ 实现类:ReentrantReadWriteLock :读写锁。

用这种锁可以实现更加细粒度的控制,可选择加读锁还是加写锁:

readLock()可防止在读的时候被写入;【共享锁】:多个读线程可同时持有;

writeLock()可防止在写的时候被读取;【排他锁】:每次只能一个线程持有;

10. 集合的线程安全问题:

  1. ArrayList:多线程下,对ArrayList做出修改,会出现java. util. ConcurrentModificationException 并发修改异常!

解决方案:

  • 使用Collection.SynchronizedArrayList<>(); 直接加同步锁,效率低
  • 使用CopyOnWriteArrayList【写入时复制】
1
new List<String> list = new CopyOnWriteArrayList<>();

COW技术:开始时内核并不会复制整个地址空间,而是让父子进程共享地址空间,只有在写时才复制地址空间,使得父子进程都拥有独立的地址空间,即资源的复制是在只有需要写入时才会发生。

​ 只有在写入/删除的时候,才会从原来的数据复制一个副本出来,然后修改这个副本,最后把原数据替换成当前的副本。CopyOnWriteArrayList 比 Vector牛逼在哪? 只在add/delete方法里用了lock锁;(只有在修改时才加锁);

  1. HashSet:多线程下,对HashSet做出修改,会出现java. util. ConcurrentModificationException 并发修改异常!

解决方案:

  • 使用Collection.SynchronizedSet<>(); 直接加同步锁,效率低
  • 使用CopyOnWriteArraySet【写入时复制】
1
Set<String> set = new CopyOnWriteArraySet<>();

注:HashSet底层:是HashMap实现的:set内的元素就是map中的“key”;value是一个PRESENT常量

  1. HashMap:

回顾:

1
2
Map<String, String> map = new HashMap<>(capacity ,factor);
//构造函数:初始容量,加载因子:默认:16,0.75;

多线程下,对HashSet做出修改,会出现java. util. ConcurrentModificationException 并发修改异常!

解决方案:

  • 使用Collection.SynchronizedMap<>(); 直接加同步锁,效率低
  • 使用ConcurrentHashMap:细化加锁粒度,非整个Map加锁,JDK1.8也摒弃了Segment加锁,直接对Node加锁
1
Map<String, String> map = new ConcurrentHashMap<>();

11. Callable接口:

与runnable接口的区别:

Runnable Callable
返回值 无 返回泛型类型的返回值
异常 不能抛出 可抛出已检查异常
方法 run() call()

使用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class CallableTest{
public static void , main(String[] args) {
MyThread thread = new MyThread();
FutureTask futureTask = new FutureTask(thread); // 把thread封装进futuretask里面;
new Thread(futureTask, name: "A"). start(); // 调用futuretask线程
Integer o = (Integer) futureTask. get(); // 获取Callable线程的返回结果
//这个get方法可能产生阻塞,因为他要等待结果的产生。要么通过异步通信来获取此结果。
}
}

class MyThread implements Callable<Integer> {
@Override public Integer call() {
System. out . println("call()");
return 1024; //返回值的类型为 Integer
}
}

注:

  1. 这个get方法获取结果可能产生阻塞,因为他要等待结果的产生;要么通过异步通信来获取此结果。

  2. 这个结果会缓存下来

12. 三大常用辅助工具类

  1. CountDownLatch:倒数计数器:

    每次有线程调用countDownLatch(), 数量-1 , 当计数器变为0 , countDownLatch.await()就会被唤醒,继续执行!

    1
    2
    3
    4
    核心方法:
    CountDownLatch countDownLatch = new CountDownLatch(6);//new一个计数器,初始值为6
    countDownLatch. countDown(); //计数-1;
    countDownLatch.await(); //等待计数器归零,然后再向下执行
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    //Demo:   
    public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
    //总数是6,必须要执行任务的时候,再使用!
    CountDownLatch countDownLatch = new CountDownLatch(6);
    for(int i=1;i<=6;i++){
    new Thread(()->{System.out.println(Thread.currentThread().getName()+" Go out");
    countDownLatch. countDown(); //数量-1
    },String.valueOf(i)).start();
    }
    countDownLatch. await(); //等待计数器归零,然后再向下执行!
    System.out.println("Close Door");
    }
    }
  2. CyclicBarrier: 循环栅栏

    从字面上的意思可以知道,这个类的中文意思是“循环栅栏”。大概的意思就是一个可循环利用的屏障。

    它的作用就是会让所有线程都等待完成后才会继续下一步行动。

    • 线程调用 await() 表示自己已经到达栅栏;
    • 当所有线程到达栅栏的数量等于设置的数量(这里是7)时,程序继续往下执行
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public static void main(String[] args) {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
    System.out.println("召唤神龙成功");
    });
    for (int i = 1; i <= 7; i++) {

    int finalI = i;
    new Thread(() -> {
    System.out.println(Thread.currentThread().getName()+finalI);
    try {
    cyclicBarrier.await();//等待7次
    } catch (InterruptedException | BrokenBarrierException e) {
    e.printStackTrace();
    }
    }).start();
    }
    }
  3. Semaphore:信号量

    用于限制可以访问某些资源(物理或逻辑的)的线程数目:

    比如:假如有3个窗口可以打饭,同一时刻也只能有3名同学打饭。第四个人来了之后就必须在外面等着,只要有打饭的同学好了,就可以去相应的窗口了。

    用法:acquire()方法、release()方法

    acquire():获取信号量,若获得不到则一直等待直到获得;

    release():释放信号量, 会将当前的可用信号量+ 1 ,然后噪醒等待的线程;

    应用场景:多个共享资源互斥的使用!并发限流,控制最大的线程数

动态代理机制

发表于 2022-04-13

tags:spring

动态代理

​ java动态代理机制中有两个重要的类和接口InvocationHandler(接口)和Proxy(类),这一个类Proxy和接口InvocationHandler是我们实现动态代理的核心;

1.InvocationHandler接口是proxy代理实例的调用处理程序实现的一个接口,每一个proxy代理实例都有一个关联的调用处理程序;在代理实例调用方法时,方法调用被编码分派到调用处理程序的invoke方法。看下官方文档对InvocationHandler接口的描述:
1
2
3
4
5
6
7
{@code InvocationHandler} is the interface implemented by
the <i>invocation handler</i> of a proxy instance.

<p>Each proxy instance has an associated invocation handler.
When a method is invoked on a proxy instance, the method
invocation is encoded and dispatched to the {@code invoke}
method of its invocation handler.

​ 每一个动态代理类的调用处理程序都必须实现InvocationHandler接口,并且每个代理类的实例都关联到了实现该接口的动态代理类调用处理程序中,当我们通过动态代理对象调用一个方法时候,这个方法的调用就会被转发到实现InvocationHandler接口类的invoke方法来调用,看如下invoke方法:

2.Proxy类就是用来创建一个代理对象的类,它提供了很多方法,但是我们最常用的是newProxyInstance方法。
1
2
3
4
5
6
public static Object newProxyInstance(ClassLoader loader, 
Class<?>[] interfaces,
InvocationHandler h)123
Returns an instance of a proxy class for the specified interfaces
that dispatches method invocations to the specified invocation
handler. This method is equivalent to:123

这个方法的作用就是创建一个代理类对象,它接收三个参数,我们来看下几个参数的含义:

  • loader:一个classloader对象,定义了由哪个classloader对象对生成的代理类进行加载
  • interfaces:一个interface对象数组,表示我们将要给我们的代理对象提供一组什么样的接口,如果我们提供了这样一个接口对象数组,那么也就是声明了代理类实现了这些接口,代理类就可以调用接口中声明的所有方法。
  • h:一个InvocationHandler对象,表示的是当动态代理对象调用方法的时候会关联到哪一个InvocationHandler对象上,并最终由其调用。
3动态代理中核心的两个接口和类上面已经介绍完了,接下来我们就用实例来讲解下具体的用法
  • 首先我们定义一个接口People
1
2
3
4
5
6
package reflect;

public interface People {

public String work();
}
  • 定义一个Teacher类,实现People接口,这个类是真实的对象
1
2
3
4
5
6
7
8
9
10
11
package reflect;

public class Teacher implements People{

@Override
public String work() {
System.out.println("老师教书育人...");
return "教书";
}

}
  • 现在我们要定义一个代理类的调用处理程序,每个代理类的调用处理程序都必须实现InvocationHandler接口,代理类如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package reflect;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

public class WorkHandler implements InvocationHandler{

//代理类中的真实对象
private Object obj;

public WorkHandler() {
// TODO Auto-generated constructor stub
}
//构造函数,给我们的真实对象赋值
public WorkHandler(Object obj) {
this.obj = obj;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//在真实的对象执行之前我们可以添加自己的操作
System.out.println("before invoke。。。");
Object invoke = method.invoke(obj, args);
//在真实的对象执行之后我们可以添加自己的操作
System.out.println("after invoke。。。");
return invoke;
}
}

上面的代理类的调用处理程序的invoke方法中的第一个参数proxy好像我们从来没有用过…

  • 接下来我们看下客户端类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package reflect;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;

public class Test {

public static void main(String[] args) {
//要代理的真实对象
People people = new Teacher();
//代理对象的调用处理程序,我们将要代理的真实对象传入代理对象的调用处理的构造函数中,最终代理对象的调用处理程序会调用真实对象的方法
InvocationHandler handler = new WorkHandler(people);
/**
* 通过Proxy类的newProxyInstance方法创建代理对象,我们来看下方法中的参数
* 第一个参数:people.getClass().getClassLoader(),使用handler对象的classloader对象来加载我们的代理对象
* 第二个参数:people.getClass().getInterfaces(),这里为代理类提供的接口是真实对象实现的接口,这样代理对象就能像真实对象一样调用接口中的所有方法
* 第三个参数:handler,我们将代理对象关联到上面的InvocationHandler对象上
*/
People proxy = (People)Proxy.newProxyInstance(handler.getClass().getClassLoader(), people.getClass().getInterfaces(), handler);
//System.out.println(proxy.toString());
System.out.println(proxy.work());
}
}

看下输出结果:

1
2
3
4
before invoke。。。
老师教书育人...
after invoke。。。
教书1234

​ 通过上面的讲解和示例动态代理的原理及使用方法,在Spring中的两大核心IOC和AOP中的AOP(面向切面编程)的思想就是动态代理,在代理类的前面和后面加上不同的切面组成面向切面编程。

上面我们只讲解了Proxy中的newProxyInstance(生成代理类的方法),但是它还有其它的几个方法,我们下面就介绍一下:

  • getInvocationHandler:返回指定代理实例的调用处理程序
  • getProxyClass:给定类加载器和接口数组的代理类的java.lang.Class对象。
  • isProxyClass:当且仅当使用getProxyClass方法或newProxyInstance方法将指定的类动态生成为代理类时,才返回true。
  • newProxyInstance:返回指定接口的代理类的实例,该接口将方法调用分派给指定的调用处理程序。

Redis设置序列化方式

发表于 2022-04-13

tags:Redis

Redis设置序列化方式:

1
2
3
4
5
6
7
8
9
10
//修改默认的redisTemplate的持久化方式;
@Bean
public RedisTemplate<Object,Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){
RedisTemplate<Object,Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setKeySerializer(new StringRedisSerializer());
return redisTemplate;
}

秒杀开发笔记

发表于 2022-04-13

秒杀开发笔记

SecKill:基于 SpringBoot+Mybatis+Redis的高并发秒杀API

  • 业务需求:

    ​ 此项目模拟的是电商活动中一个商品的高并发秒杀场景。商品列表中有N种商品,每个商品都有自己的库存数目,每个商品的秒杀开始时间也不相同。每个用户只可以秒杀成功一次某种商品,不同的商品可以分别秒杀。

  • 业务逻辑:

    数据优化:

    • 热点数据缓存:启用SpringBoot的声明式缓存支持,将热门商品信息等热点数据放入Redis缓存,降低服务器压力;
    • 内容分发网络:使用CDN存储静态资源,解决Tomcat服务器带宽瓶颈;
    • 动静数据分离:前端采用Freemarker模板引擎将页面静态化,并使用ajax动态加载后端产生的数据。

    并发优化:

    • 避免超卖:商品库存信息放入Redis,在Redis缓存中预减库存,完成秒杀用户确权与避免商品超卖问题;
    • 削峰限流:使用RabbitMQ消息队列,用户秒杀成功确权与订单信息生成这两个过程异步执行,实现削峰限流和业务逻辑解耦。
    • 负载均衡:使用Nginx反向代理服务器,将用户请求转发到多台服务器,实现负载均衡及更高的并发量;同时使用分布式Session,将 Session转存到Redis中实现 Session 共享访问。

    安全优化:

    • URL加盐:后端动态生成商品秒杀的路径,等到商品秒杀的时间获取秒杀路径的接口才会返回秒杀路径;

    • 防刷与反爬:在Redis中保存当前客户端的访问次数,对于在设定时间内超过访问次数限制的IP,使用AOP拦截器对其拒绝服务。

    • 服务降级和服务熔断:当某个服务单元发生故障监控,向调用方法返回一个符合预期的、可处理的备选响应。(未实现)

技术栈:

  • SpringBoot 2.x
  • Mybatis
  • Redis
  • RabbitMQ
  • Nginx
  • MySQL

1. 配置依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.Artfairy's blog</groupId>
<artifactId>babytun-seckill</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>babytun-seckill</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.3</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
1
2
3
4
5
mapper头模板:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">

2. 分层编写后端和前端页面: view -> controller -> service -> dao

3. 压力测试:Jmeter

启动命令:sh jmeter

模拟100用户、每个用户进行100次访问;

未使用redis缓存时:

  • Throughput :吞吐量:每秒489次请求

  • Average/Min/Max 响应时间:192/s

高并发问题分析:

​ 在电商应用中,90%数据处理是用于读取数据,在海量数据的情况下,数据库最有可能成为高并发的瓶颈。因此提高数据库效率戓者降低数据库交互就是我们高并发首先要考虑的向题。

​ 电商应用中,很大ー部分数据是在一段时间内稳定不变的, 其中很大ー部分数据是一段时间內稳定不变的,例如”商 信息”、””会员信息“、“网站基本信息”等;对于稳定数据,常用两种方式进行高并发处理:

  • 利用缓存( Redis、 Ehcache、 Memcached..)
  • 利用静态化技术(staticize)转化为Html

4. 静态数据优化:Redis缓存

Redis在Springboot中的使用:

4.1 在主程序中开启声明式缓存注解支持

4.2 对service层中的方法,利用@Cacheable注解开启缓存

4.3 在yml中配置Redis信息:

4.4 使用redis后的JMeter吞吐量:

5. 页面静态化技术:[以空间换时间]

​ 页面静态化是指将动态页面(jsp/ freemarker,…)变成html静态页面。动态页面便于管理,但是访问网页时还需要程序先处理一遍,所以导致访问速度相对较慢。而静态页面访问速度快,却又不便于管理。静态化可以将两种页面的好处集中到一起。

动态生成模板对象:

注:可以使用大循环直接对所有商品页面进行静态化即可;

Nginx(反向代理服务器)

​ Nginx是一款轻量级的Web服务器/反向代理服务器, 其特点是占内存少,并发能力强,事实上 nginx的并发能力确实在同类型的网页服务器中表现较好。

安装:brew install nginx

启动:brew services start nginx

路径:/usr/local/etc/nginx/nginx.conf

配置:在39行设置需要映射的根目录

使用nginx+静态化页面的并发表现:

静态化后的额外处理:

5.1 自动计划任务静态化

使用Springboot的计划任务自动生成静态化页面:

找到数据库中上次修改不超过5分钟的数据:

制定间隔5分钟的计划任务:

5.2 动静态数据分离:

​ 页面静态化执行效率固然高,但往往在页面中也存在动态数据。例如“评论”的内容就一直在不断变化肯定不能对其静态化处理。遇到这种动态数据需要在静态页面中使用AJAX动态加载后端产生的数据。

​

  • 使用ajax 加载动态的评论内容,写在goods.ftlh中:

再重新生成全部的nginx静态化页面;

注意:此时,从nginx服务器访问网页,还不能找到对应的评论内容:

因此需要配置Nginx代理:

​ 通过此配置可以将 nginx的 /evaluate/页面代理到tomcat服务器的http://locathost:8080/evaluate; 注意要保持开启tomcat服务器,否则无法成功代理。

总结动静分离的效果:访问静态化的页面时,如果需要访问动态数据:可以通过配置Nginx代理,将某页面代理到tomcat服务器,实现获取动态数据。

6. 秒杀问题分析:

​ 秒杀我们日常开发中最常见的高并发场景。秒杀的特点:1)瞬超高访问量; 2)商品总量有限,先到先得; 3)有明确的开始、结束时间。

秒杀活动常见两个挑战:

  • 高并发: 基本主流电商的秒杀QPS峰值都在100万+。
  • 避免超卖: 如何避免购买商品人数不超过商品数量上限,这是要面临的难题。

商品库存count的改变未保证对其他线程的可见性,因此发生超卖问题。

解决方法:有多种方案来解决这个问题,我们主要看3种方案:

  • 悲观锁:影响性能;

  • 乐观锁:高并发下失败率高,可引入重入机制在失败后重复尝试;

  • Redis+ Lua

  • 使用Redis预减库存-> 解决超卖问题:

为什么选择 Redis:

  • 单线程模型
  • 内存存储,高达10WQPS
  • 天生分布式支持

实现过程:

6.1 编写mapper 数据库操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//查询出符合秒杀时间的商品:
<mapper namespace="com.Artfairy's blog.babytunseckill.dao.PromotionSecKillDAO">
<select id="findUnstartSecKill" resultType="com.Artfairy's blog.babytunseckill.entity.PromotionSecKill">
select * from t_promotion_seckill
where now() BETWEEN start_time AND end_time and status = 0
</select>
</mapper>
//更新秒杀状态:
<update id="update" parameterType="com.Artfairy's blog.babytunseckill.entity.PromotionSecKill">
update t_promotion_seckill
set goods_id = #{goodsId},ps_count = #{psCount},
start_time = #{startTime}, end_time = #{endTime},
status = #{status}, current_price = #{currentPrice}
where ps_id = #{psId}
</update>

6.2 秒杀调度任务:每隔5秒检查符合秒杀时间的商品,使其进入秒杀状态,并将其放入Redis的List:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
public class SecKillTask {
@Resource
private PromotionSecKillDAO promotionSecKillDAO;
@Resource
private RedisTemplate redisTemplate;
//RedisTemplate是Spring封装的Redis操作类,提供了一系列操作redis的模板方法

@Scheduled(cron = "0/5 * * * * ?")
public void startSecKill(){
List<PromotionSecKill> list = promotionSecKillDAO.findUnstartSecKill();
for (PromotionSecKill ps : list) {
//删除以前重复的活动任务缓存
redisTemplate.delete("seckill:count:" + ps.getPsId());
System.out.println(ps.getPsId() + "秒杀活动已启动 !");
for (int i = 0; i < ps.getPsCount(); i++) {
//有几个库存商品,则初始化几个list对象;list中先存入商品ID;
redisTemplate.opsForList().rightPush("seckill:count:" + ps.getPsId(),ps.getGoodsId());
}
ps.setStatus(1);
promotionSecKillDAO.update(ps);
}
}
}

6.3 使用Redis实现库存预减:如果能从秒杀商品队列中获取有效的goodsId,就将psId和对应的userId放入Redis的set中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Service
public class PromotionSecKillService {
@Resource
private PromotionSecKillDAO promotionSecKillDAO;
@Resource
private RedisTemplate<Object,Object> redisTemplate;

public void processSecKill(Long psId, String userid, Integer num) throws SecKillException {
//如果能从秒杀商品队列中获取有效的goodsId,就将psId和对应的userId放入Redis的set中;
PromotionSecKill ps = promotionSecKillDAO.findById(psId);
if (ps == null) {
//秒杀活动不存在:
throw new SecKillException("该秒杀活动不存在!");
}
if (ps.getStatus() == 0) {
throw new SecKillException("该秒杀活动还未开始!");
}
if (ps.getStatus() == 2) {
throw new SecKillException("该秒杀活动已结束!");
}
Integer goodsId = (Integer) redisTemplate.opsForList().leftPop("seckill:count:" + ps.getPsId());
if (goodsId != null) {
//先判断用户id的set集合中是否已经存在此id,若已存在则不允许再次抢购
boolean isExisted = redisTemplate.opsForSet().isMember("seckill:users:" + ps.getPsId(), userid);
if (!isExisted) {
System.out.println("恭喜" + userid + "抢到商品了,快去下单吧!");
redisTemplate.opsForSet().add("seckill:users:" + ps.getPsId(), userid);
}else {
//若该用户已抢购过,抛出自定义异常,并再将此商品加回队列的尾部;
redisTemplate.opsForList().rightPush("seckill:count:" + ps.getPsId(),ps.getGoodsId());
throw new SecKillException("抱歉,您已经参加过此活动,请勿重复抢购!");
}
} else {
throw new SecKillException("抱歉,该商品已被抢光,下次再来吧!");
}
}
}

6.4 Controller层:控制前端页面访问相应页面时调用秒杀方法,并返回结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@RestController
public class SecKillController {
@Resource
PromotionSecKillService promotionSecKillService;

@RequestMapping("/seckill")
public Map<String, String> processSecKill(Long psid, String userid) {
Map<String, String> result = new HashMap<>();
try {
promotionSecKillService.processSecKill(psid, userid, 1);
result.put("code", "0");
result.put("message", "success");
} catch (SecKillException e) {
result.put("code", "500");
result.put("message", e.getMessage());
}
return result;
}
}

6.5 设置前端seckill.html的抢购按钮入口,告诉用户抢购结果:

6.6 秒杀活动结束后:

将此秒杀任务状态设为已过期,并更新此状态,并清除redis中已过秒杀时间的商品:

1
2
3
4
5
6
7
8
9
10
11
12
@Scheduled(cron = "0/5 * * * * ?")
public void endSecKill(){
List<PromotionSecKill> psList = promotionSecKillDAO.findExpireSecKill();
for (PromotionSecKill ps : psList) {
System.out.println(ps.getPsId()+"秒杀活动已结束!");
//秒杀结束后,将此秒杀任务状态设为已过期,并更新此状态
ps.setStatus(2);
promotionSecKillDAO.update(ps);
//删除redis中已过秒杀时间的商品;
redisTemplate.delete("seckill:count" + ps.getPsId());
}
}

7. 使用RabbitMQ:削峰、限流

7.0 配置好RabbitMQ的环境,创建一个Exchange和一个队列;

7.1 编写Service 向MQ队列发送订单号(速度快),并在Controller层调用此方法:

1
2
3
4
5
6
7
8
9
10
11
public String sendOrderToQueue(String userid){
System.out.println("准备向队列发送信息...");
//订单基本信息;
HashMap<String,String> data = new HashMap<>();
data.put("userid",userid);
String orderNo = UUID.randomUUID().toString();
data.put("orderNo",orderNo);
//可附加额外的订单信息,如电话 地址等;
rabbitTemplate.convertAndSend("exchange-order",null,data);
return orderNo;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RequestMapping("/seckill")
public Map<String,Object> processSecKill(Long psid, String userid) {
Map<String, Object> result = new HashMap<>();
try {
promotionSecKillService.processSecKill(psid, userid, 1);
String orderNo = promotionSecKillService.sendOrderToQueue(userid);
HashMap<String,String> data = new HashMap<>();
data.put("orderNo",orderNo); //生成订单编号
result.put("code", "0");
result.put("message", "success");
result.put("data",data);
} catch (SecKillException e) {
result.put("code", "500");
result.put("message", e.getMessage());
}
return result;
}

7.2 同时,将生成的订单号利用ajax回调给前端(速度快):

7.3 配置Rabbit消费者信息,并新建一个消费者类:

1
2
3
4
5
6
7
8
9
10
11
12
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
#定义消费者最多同时处理10个消息
prefetch: 10
#消息手动确认
acknowledge-mode: manual

7.4 @RabbitHandler注解:自动从RabbitMQ队列中获取订单号,并实例化一个订单写入数据库中(速度慢):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Component
public class OrderConsumer {
@Resource
private OrderDAO orderDAO;

@RabbitListener( //绑定创建好的rabbitmq交换机和队列
bindings = @QueueBinding(
value = @Queue(value = "queue-order"),
exchange = @Exchange(value = "exchange-order",type = "fanout")
)
)
@RabbitHandler //消费者获取订单数据,插入到数据库中;
public void handleMessage(@Payload Map<String,Object> data, Channel channel,
@Headers Map<String,Object> headers){
System.out.println("========获取到订单数据"+data+"========");
try {
try {
//sleep 500ms,模拟对接支付、物流系统、日志登记...
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
Order order = new Order();
order.setOrderNo(data.get("orderNo").toString());
order.setOrderStatus(0);
order.setUserid(data.get("userid").toString());
order.setRecvName("xxx");
order.setRecvAddress("xxx");
order.setRecvMobile("138********");
order.setAmount(19.8f);
order.setPostage(0f);
order.setCreateTime(new Date());
orderDAO.insert(order);
Long tag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(tag,false);//消息确认,false:只进行单个接收 不进行批量接收
System.out.println(data.get("orderNo"+"订单已创建"));
} catch (IOException e) {
e.printStackTrace();
}
}
}

7.5 由于生产者和消费者是异步的关系,因此创建一个”正在创建订单,请稍后…”的页面,作为过渡:

同时,检查订单号是否已经在数据库中成功生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//检查订单号是否已经在数据库中成功生成
@GetMapping("/checkorder")
public ModelAndView checkOrder(String orderNo){
ModelAndView mav = new ModelAndView();
Order order = promotionSecKillService.checkOrder(orderNo);
if(order != null){
//代表订单已在数据库中创建好了
mav.addObject("order",order);
mav.setViewName("/order");
//跳转到order页面,显示订单信息;
}else{
mav.addObject("orderNo",orderNo);
mav.setViewName("/waiting");
//跳转到等待页面...在等待页面等待三秒后再次尝试检查订单号...
}
return mav;
}

8. Nginx 负载均衡

Nginx六种负载均衡策略:

  • Default - 轮询策略
  • Least connected - 最少连接策略
  • Weighted - 权重策略
  • IP Hash - IP绑定策略 :高并发下不推荐使用,因为会使负载不均衡
  • fair-按响应时间(第三方)
  • url hash-url分配策略(第三方)

8.1 使用Nginx代理后端服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#后端服务器池
upstream babytun {
#least_conn; #最少连接策略
#ip_hash; #ip_hash策略

server 192.168.1.3:8001 weight=5; #按照权重分配
server 192.168.1.3:8002 weight=2;
server 192.168.1.3:8003 weight=1;
server 192.168.1.3:8004 weight=2;
}

server {
#nginx通过80端口提供服务
listen 80;
#使用babytun服务器池进行后端处理
location /{
proxy_pass http://babytun;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}

​ 配置完成后,只需访问192.168.1.3/goods?gid=1234 即可访问我们的页面,nginx默认按照轮询策略在4台server之间切换服务器。

8.2 Nginx分布式Session同步问题:

​ 例如使用Nginx负载均衡时,用户登录后,再刷新页面可能会代理到另一台server,而此台server没有之前的session,从而导致丢失登录状态。

解决办法:将 Session转存到 Redis中实现 Session 共享访问:

  • pom中引入Spring-Session依赖以及Redis依赖
  • 主程序启用@EnableRedisHttpSession即可;

Spring-Session将自动监听Session,并将其保存到Redis中!

​ 这样,用户登录上去之后,之后无论是再代理到哪台服务器都会一直拥有这个Session,从而保持登录状态。

9. Nginx缓存静态资源降低Tomcat压力:

9.1 图片、样式等静态资源不再经过Tomcat服务器,直接由Nginx服务器指向指定文件夹:

在/usr/local/etc/nginx/nginx.conf中修改配置即可:

1
2
3
4
5
6
7
8
#临时文件夹
proxy_temp_path /Users/tianjirong/Documents/babytun-lb/nginx-temp;
#设置缓存目录;
#levels代表采用1:2,即采用两级目录的形式保存静态缓存文件,同时文件名进行了MD5编码;
#keys_zone 定义缓存名称 以及 内存大小使用100M交换空间;
#如果某个缓存文件超过7天未使用,则删除之;
#文件夹最大不超过20g,超过后自动删除访问频率最低的缓存文件;
proxy_cache_path /Users/tianjirong/Documents/babytun-lb/nginx-cache levels=1:2 keys_zone=babytun-cache:100m inactive=7d max_size=20g;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
server {
#nginx通过80端口提供服务
listen 80;

#静态资源缓存,利用正则表达式匹配url,匹配成功的则执行内部逻辑,~*表示不区分大小写:
location ~* \.(gif|jpg|css|png|js|woff|html)(.*){
proxy_pass http://babytun;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_cache babytun-cache;
#如果资源相应状态码为200->成功; 302->暂时性重定向时,资源缓存文件有效期1天;
proxy_cache_valid 200 302 24h;
proxy_cache_valid 301 24h;
}

注意:当需要缓存.html的页面时,需要保证该页面是不经常变动的,否则不应该缓存它。

9.2 Nginx使用Gzip资源压缩:节省带宽:

  • 利用浏览器支持的Gzip压缩, nginx打包压缩并传输 css、js等静态资源,可将带宽压力降低30%~70%

在/usr/local/etc/nginx/nginx.conf中开启Gzip即可:

1
2
3
4
5
6
7
8
9
10
11
12
#开启nginx Gzip压缩
gzip on;
#超过1K的文件才压缩
gzip_min_length 1k;
#压缩哪些类型:对文本类型压缩效果很好 对图片效果不好
gzip_types text/plain application/javascript text/css application/x-javascript;
#当使用低版本IE浏览器时禁用压缩
gzip_disable "MSIE [1-6]\.";
#压缩使用的缓存,每个内存也为4K,申请32倍;一般这样写就可以
gzip_buffers 32 4k;
#最重要: 设置压缩级别: 1-9 越大压缩比越高,但浪费CPU资源,建议1-4即可
gzip_comp_level 1;

9.3 使用CDN:解决带宽瓶颈、加速访问速度

​ 可将整个layui文件夹上传到CDN中,这样传输这些资源的时候,流量就不会走本地服务器,而是直接走CDN的服务器,实现解决带宽瓶颈和加速访问速度!

使用方法:

  • 开通阿里云等OSS服务,向OSS中上传我们的资源
  • 开通阿里云CDN服务,将OSS中的资源分发到各个CDN服务器
  • 将自己的域名绑定映射到CDN服务器
  • 在前端页面中将所用资源的本地路径改为自己域名中的远程路径即可

10. 流量防刷与反爬虫:

实现思路:

  • Redist提供了TTL有效期特性(设置超时时间)

  • 对于每一个用户,在 Redisi记录访向次数:

    例如​ key:188.38.12.33 value:39 超时时间:60s

  • 用户每访问1次,对应计数器+1,超过上限(30)则停止服务

  • 如计数器超过100则认为爬虫攻击,永久加入黑名单

  • 1分钟后key销毁,重新开始计数

实现过程:

  1. 编写SpringBoot的AOP拦截器,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//AOP拦截器功能:流量防刷
@Component
public class AntiRefreshInterceptor implements HandlerInterceptor {

@Resource //RedisTemplate,用于筒化 Redis操作,在IOC容器中自动被初始化
private RedisTemplate<Object,Object> redisTemplate;

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
response.setContentType("text/html;charset=utf-8"); //设置提示信息的字符集
String clientIp = request.getRemoteAddr();//获取客户端IP
String userAgent = request.getHeader("User-Agent");//获取客户端浏览器信息
String client = "anti-refresh:" + DigestUtils.md5Hex(clientIp + "_" + userAgent);//用MD5摘要来标识一个用户

//若此IP在黑名单中,则直接返回false;
if(redisTemplate.hasKey("anti-refresh:blackList")){
if(redisTemplate.opsForSet().isMember("anti-refresh:blackList",client)){
response.getWriter().println("检测到您的IP访问异常,您已被加入黑名单!");
return false;
}
}
Integer num = (Integer) redisTemplate.opsForValue().get(client);//记录1分钟内的访问次数
if(num == null){//第一次访问
redisTemplate.opsForValue().set(client,1,60, TimeUnit.SECONDS); //放入redis,有效期60S
}else{
if(num > 20 && num < 40){
response.getWriter().println("请求过于频繁,请1分钟后重试!");
redisTemplate.opsForValue().increment(client,1); //每访问一次redis的值+1;
return false;
}else if(num >= 40){
redisTemplate.opsForSet().add("anti-refresh:blackList",clientIp);
response.getWriter().println("检测到您的IP访问异常,您已被加入黑名单!");
System.out.println("IP"+clientIp+"访问异常,已被加入黑名单!");
return false;
}else {
redisTemplate.opsForValue().increment(client,1); //每访问一次redis的值+1;
}
}
return true;
}
}
  1. 注入拦截器:写一个@Configuration类即可.可选择需要拦截的页面,如:”/goods”
1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class WebConfig implements WebMvcConfigurer {
@Resource
private AntiRefreshInterceptor antiRefreshInterceptor;

@Override
//注入拦截器
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(antiRefreshInterceptor).addPathPatterns("/goods");//作用的URL;
}
}

秋招待投递

发表于 2022-04-13

秋招待投递:

广汽丰田: https://gac-toyota.zhiye.com/CAMPUS

长虹: 长虹研究院 – Java工程师(成都)http://campus.51job.com/ch2021/

小米:https://app.mokahr.com/campus_apply/xiaomi/24517#/job/109cb990-dd53-47b8-9846-90d3b2b1fcc1

Redis之事务

发表于 2022-04-13

1. Redis之事务

Redis事务的概念:

  Redis 事务的本质是一组命令的集合。事务支持一次执行多个命令,一个事务中所有命令都会被序列化。在事务执行过程,会按照顺序串行化执行队列中的命令,其他客户端提交的命令请求不会插入到事务执行命令序列中。

  总结:redis事务就是一次性、顺序性、排他性的执行一个队列中的一系列命令。 

Redis事务没有隔离级别的概念:

  批量操作在发送 EXEC 命令前被放入队列缓存,并不会被实际执行,也就不存在事务内的查询要看到事务里的更新,事务外查询不能看到。

Redis不保证原子性:

  Redis中,单条命令是原子性执行的,但事务不保证原子性,且没有回滚。事务中任意命令执行失败,其余的命令仍会被执行。

Redis事务的三个阶段:

  • 开始事务
  • 命令入队
  • 执行事务

Redis事务相关命令:

  watch key1 key2 … : 监视一或多个key,如果在事务执行之前,被监视的key被其他命令改动,则事务被打断 ( 类似乐观锁 )

  multi : 标记一个事务块的开始( queued )

  exec : 执行所有事务块的命令 ( 一旦执行exec后,之前加的监控锁都会被取消掉 ) 

  discard : 取消事务,放弃事务块中的所有命令

  unwatch : 取消watch对所有key的监控

事务的案例:

使用watch乐观锁的案例

总结:

​ watch指令类似于乐观锁,在事务提交时,如果watch监控的多个KEY中任何KEY的值已经被其他客户端更改,则使用EXEC执行事务时,事务队列将不会被执行,同时返回Nullmulti-bulk应答以通知调用者事务执行失败。

Redis之HyperLogLog与布隆过滤器

发表于 2022-04-13

redis的HyperLogLog与布隆过滤器

​ 首先,HyperLogLog与布隆过滤器都是针对大数据统计存储应用场景下的知名算法。HyperLogLog是在大数据的情况下关于数据基数的空间复杂度优化实现,布隆过滤器是在大数据情况下关于检索一个元素是否在一个集合中的空间复杂度优化后的实现。在传统的数据量比较低的应用服务中,我们要实现数据基数和数据是否存在分析的功能,通常是简单的把所有数据存储下来,直接count一下就是基数了,而直接检索一个元素是否在一个集合中也很简单。

​ 但随着数据量的急剧增大,传统的方式已经很难达到工程上的需求。过大的数据量无论是在存储还是在查询方面都存在巨大的挑战,无论我们是用位存储还是树结构存储等方式来优化,都没法达到大数据时代的要求或者是性价比太低。

​ 于是HyperLogLog与布隆过滤器这两个算法就很好的派上了用场。他们的使用可以极大的节约存储空间,作为代价,则是牺牲了一个小概率的准确性,这可以很好的达到工程上的需求,对于那些要求准确度没那么高,但数据量巨大的需求是非常合适的。

HyperLogLog原理

​ 最直白的解释是,给定一个集合 S,对集合中的每一个元素,我们做一个哈希,假设生成一个 16 位的比特串,从所有生成的比特串中挑选出前面连续 0 次数最多的比特串,假设为 0000000011010110,连续 0 的次数为 8,因此我们可以估计该集合 S 的基数为 2^9。当然单独用这样的单一估计偶然性较大,导致误差较大,因此在实际的 HyperLogLog 算法中,采取分桶平均原理了来消除误差。

特点:实现牺牲了一定的准确度(在一些场景下是可以忽略的),但却实现了空间复杂度上的极大的压缩,可以说是性价比很高的。虽然基数不完全准确,但是可以符合,随着数量的递增,基数也是递增的。

布隆过滤器原理

​ 布隆过滤器(Bloom Filter)的核心实现是一个超大的位数组和几个哈希函数。假设位数组的长度为m,哈希函数的个数为k,以上图为例,具体的操作流程:假设集合里面有3个元素{x, y, z},哈希函数的个数为3。首先将位数组进行初始化,将里面每个位都设置为0。对于集合里面的每一个元素,将元素依次通过3个哈希函数进行映射,每次映射都会产生一个哈希值,这个值对应位数组上面的一个点,然后将位数组对应的位置标记为1。查询W元素是否存在集合中的时候,同样的方法将W通过哈希映射到位数组上的3个点。如果3个点的其中有一个点不为1,则可以判断该元素一定不存在集合中。反之,如果3个点都为1,则该元素可能存在集合中。注意:此处不能判断该元素是否一定存在集合中,可能存在一定的误判率。可以从图中可以看到:假设某个元素通过映射对应下标为4,5,6这3个点。虽然这3个点都为1,但是很明显这3个点是不同元素经过哈希得到的位置,因此这种情况说明元素虽然不在集合中,也可能对应的都是1,这是误判率存在的原因。

特点:巧妙的使用hash算法和bitmap位存储的方式,极大的节约了空间。

​ 由于主要用的是hash算法的特点,所有满足和hash算法相同的规则:当过滤器返回 true时(表示很有可能该值是存在的),有一定概率是误判的,即可能不存在;当过滤器返回false时(表示确定不存在),是可以完全相信的。

​ 我们换个数据的角度来看规则:当数据添加到布隆过滤器中时,对该数据的查询一定会返回true;当数据没有插入过滤器时,对该数据的查询大部分情况返回false,但有小概率返回true,也就是误判。

  我们知道它最终满足的规则和hash的规则是一致的,只是组合了多个hash,使用了bitmap来存储,大大优化了存储的空间和判断的效率。

redis中的HyperLogLog

在redis中对HyperLogLog 的支持早在2.8.9的时候就有了。它的操作非常简单

  • PFADD 给HyperLogLog添加值
  • PFCOUNT 获取基数
  • PFMERGE 合并两个HyperLogLog数据(完美合并,分别添加和统一添加的结果是一致的)

redis中的布隆过滤器

​ 在redis中的布隆过滤器的支持是在redis4.0后支持插件的情况下,通过插件的方式实现的 ,redis的布隆过滤器插件地址:https://github.com/RedisLabsModules/rebloom

1
2
3
4
5
6
BF.RESERVE {key} {error_rate} {size}   
//创建一个布隆过滤器,key为redis存储键值,error_rate 为错误率
BF.ADD {key} {item}
//添加值到布隆过滤器中(当过滤器不存在的时候会,会以默认值自动创建一个,建议最好提前创建好)
BF.EXISTS {key} {item}
//判断值是否存在过滤器中: true(表示很可能存在) false (表示绝对不存在)

LeetCode 回溯法

发表于 2022-04-13 | 分类于 刷题

回溯法解题框架:

回溯法有3种题型:

  1. find a path to success;

  2. find all path to success :

    2.1 求解的个数;

    2.2 求所有的解;【重点】

  3. find the best path to success;

框架:

result = [];

def backTracc(path,选择列表):

​ if(满足剪枝条件): 剪枝,return;

​ if(满足结果条件):result.add(path); return;

for 选择 in 选择列表:

​ 做选择;

​ backTrace(路径,选择列表); //进入下一层决策树;

​ 撤销选择;

具体情况优化:

【1】当path为StringBulider时,可严格按照上述流程来写。当path为String时,由于String是final修饰的,因此可以省略撤销选择的过程,因为循环中的每次递归用的都是新的变量,与原变量无关,因此不需要对原变量进行撤销选择的操作。(缺点:每次递归都要建立一个新变量)

LeetCode17:电话号码的字母组合:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class 电话号码的字母组合17 {
List<String> res;
HashMap<Character, String> map; //用于映射数字和对应的字母

public List<String> letterCombinations(String digits) {
res = new ArrayList<>();
if (digits.length() == 0) return res; //特判

map = new HashMap<>();
map.put('2', "abc");
map.put('3', "def");
map.put('4', "ghi");
map.put('5', "jkl");
map.put('6', "mno");
map.put('7', "pqrs");
map.put('8', "tuv");
map.put('9', "wxyz");

backTrack(digits, 0, new StringBuilder());
return res;
}
//digits:一串数字,如:234
//index:第几位数字:如 2
//letters:组成的字母:如 adg
//index:需要递归的数字索引
private void backTrack(String digits, int index, StringBuilder letters) {
if (digits.length() == index) { //满足结束条件
res.add(letters + "");
return;
}
String alphas = map.get(digits.charAt(index));//获取"abc"
for (byte i = 0; i < alphas.length(); ++i) {
letters.append(alphas.charAt(i)); //做选择:将当前字母添加到路径
backTrack(digits, index + 1, letters); //进入下一层决策树
letters.deleteCharAt(letters.length() - 1);//撤销选择
}
}
}

LeetCode22:括号生成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Solution {
List<String> res;
public List<String> generateParenthesis(int n) {
res = new ArrayList<String>();
backTrace(n,"",0,0);
return res;
}
private void backTrace(int n,String path,int numLeft,int numRight){
if(numRight > numLeft || numLeft > n || numRight > n) return;//剪枝
if(numLeft == n && numRight == n){
res.add(path);
return;
}
backTrace(n,path+"(",numLeft+1,numRight);
backTrace(n,path+")",numLeft,numRight+1);
}
}
12

Artfairy

15 日志
1 分类
4 标签
GitHub E-Mail Wechat

|--------------------友链-------------------|


Yuameshi

标签

I'm a student from senior high school.
I'm just a rubbish.
You can call me Yuameshi or 由雨糸
I lives in a small city of Guangdong.
Makding Microsoft's Fan.

欧阳淇淇

标签

「大切な人と再会できる日を」

IcebearType

标签

这个家伙很懒,什么都没有说。。。
© 2022 Artfairy
由 Hexo 强力驱动
萌ICP备20220417号 |
主题 — NexT.Mist