Đó là điều bắt buộc!

Observer design pattern có một vấn đề mình cần chia sẻ gấp đó là việc xử lý exception trong môi trường đa luồng. Đã có 2 lần mình mắc lỗi này, và lỗi này cực kì nghiêm trọng trên môi trường thật, nó làm treo cả một luồng nghiệp vụ luôn và phải mất rất nhiều công sức để tìm ra. Lần đầu tiên là vào năm 2015, mình đã quên bắt Exception trong lớp tính toán và khi exception xảy ra, hệ thống cứ đơ đơ như kiểu bị deadlock. Lần gần đây nhất là 2020 mình cũng hơi bất ngờ, do deploy thiếu file mà cái luồng xử lý nghiệp vụ nó bắn ra NoClassDefFoundError mà cái này thì nó không thừa kế từ Exception mà thừa kế từ Throwable (mình lại chỉ catch Exception), và thế là lại treo 1 lần nữa. 2 lần đã dạy cho mình rất nhiều bài học, để hôm nay mình có thể viết được bài này. Thật tiếc là 2 lần lỗi đều là những nghiệp vụ phức tạp, nên mình sẽ dùng bài toán đơn giản để minh họa lỗi và cách xử lý nhé.

Bài toán thực tế

Giả sử chúng ta có một chương trình chuyên thực hiện phép chia và phép chia này rất nặng và chúng ta dùng 1 thread riêng để xử lý. Code sẽ thế này:

public class ObserverDivider {
    private Queue<DivideRequest> queue = new LinkedBlockingQueue<>();

    private ObserverDivider() {
        Thread newThread = new Thread(() -> {
            while (true) {
                DivideRequest request = queue.poll();
                if(request != null) {
                request.callback.accept(request.a / request.b);
            }
        }
        });
        newThread.start();
    }

    public void divide(Integer a, Integer b, Consumer<Integer> callback) {
            queue.offer(new DivideRequest(a, b, callback));
    }

    public static void main(String[] args) throws Exception {
        ObserverDivider divider = new ObserverDivider();
        divider.divide(10, 5, result -> { // 1. Phép tính này được thự thi
            System.out.println("10 / 5 = " + result);
        });

        Thread.sleep(50);

        divider.divide(10, 0, result -> {
            System.out.println("10 / 0 = " + result); // 2. chương trình sẽ bị treo từ đây
        });
        divider.divide(20, 5, result -> {
        System.out.println("20 / 5 = " + result); // 3. Phép tính này sẽ không được thực thi
    });

}

@AllArgsConstructor
public class DivideRequest {
    private Integer a;
    private Integer b;
    private Consumer<Integer> callback;
}

Nhìn vào đây chúng ta sẽ mong muốn cả phép tính sẽ được thực thi đúng không? Nhưng không, vì 10/0 sẽ ném ra ArithmeticException, vậy nên luồng xử lý sẽ bị dừng lại và sẽ không có bất cứ phép tính nào được thực hiện nữa. Output:

10 / 5 = 2
Exception in thread "Thread-0" java.lang.ArithmeticException: / by zero
at com.example.stackoverflow.ObserverDivider.lambda$new$0(ObserverDivider.java:17)
at java.lang.Thread.run(Thread.java:748)

Những cách xử lý

Vậy có cách nào để xử lý Exception? Sẽ có 2 cách để chúng ta xử lý.

  1. Nếu nghiệp vụ của chúng ta yêu cầu nếu có lỗi xảy ra hãy bỏ qua, chúng ta có thể in ra log hoặc trả về kết quả mặc định, code sẽ thế này:
private ObserverDivider() {
    Thread newThread = new Thread(() -> {
        while (true) {
            DivideRequest request = queue.poll();
            if (request != null) {
                try {
                    request.callback.accept(request.a / request.b);
                }
                catch (Throwable e) {
                    e.printStackTrace();
                }
            }
        }
    });
    newThread.start();
}
  1. Nếu nghiệp vụ yêu cầu chúng ta phải xử lý lỗi xảy ra, chúng ta sẽ cần thay đổi lớp Callback và sử dụng thế này:
public class ObserverDivider {
    private Queue<DivideRequest> queue = new LinkedBlockingQueue<>();

    private ObserverDivider() {
        Thread newThread = new Thread(() -> {
        while (true) {
            DivideRequest request = queue.poll();
            if (request != null) {
                try {
                    request.callback.onResult(request.a / request.b);
                }
                catch (Throwable e) {
                    request.callback.onError(e);
                }
                }
            }
        });
        newThread.start();
    }

    public void divide(Integer a, Integer b, Callback callback) {
        queue.offer(new DivideRequest(a, b, callback));
    }

    public static void main(String[] args) throws Exception {
        ObserverDivider divider = new ObserverDivider();
        divider.divide(10, 5, new Callback() {
            @Override
            public void onResult(Integer result) {
                System.out.println("10 / 5 = " + result);
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("10 / 5: " + e);
            }
        });

        Thread.sleep(50);

        divider.divide(10, 0, new Callback() {
            @Override
            public void onResult(Integer result) {
                System.out.println("10 / 0 = " + result);
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("10 / 0: " + e);
            }
        });
        divider.divide(20, 5, new Callback() {
            @Override
            public void onResult(Integer result) {
                System.out.println("20 / 5 = " + result);
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("20 / 5: " + e);
            }
        });
    }

    public static interface Callback {
        void onResult(Integer result);
        void onError(Throwable e);
    }

}

public class DivideRequest {
    private Integer a;
    private Integer b;
    private Callback callback;
}

Output:

10 / 5 = 2
10 / 0: java.lang.ArithmeticException: / by zero
20 / 5 = 4

Tổng kết

Chúng ta có thể thấy lập trình Observer với async cũng không phải là đơn giản, nó làm cho source code của chúng ta dài ra rất nhiều, và nếu quên bắt Exception, rất có thể chúng ta sẽ gặp lỗi rất nghiệm trọng, vậy hãy luôn chú ý việc này nhé