Hướng dẫn python thread join hangs - tham gia chuỗi python bị treo

Vấn đề của tôi là như sau: Tôi có một lớp kế thừa từ

class Queueable():
    def __init__(self):
        self._queue = Queue()

    def append_to_job_queue(self, job):
        self._queue.put(job)
5 mà tôi muốn có thể dừng lại một cách duyên dáng. Lớp học này cũng có một hàng đợi nó nhận được công việc của nó.

Vì có khá nhiều lớp trong dự án của tôi nên có hành vi này, tôi đã tạo ra một số siêu lớp để giảm mã trùng lặp như thế này:

Hành vi liên quan đến chủ đề:

class StoppableThread(Thread):
def __init__(self):
    Thread.__init__(self)
    self._stop = Event()

def stop(self):
    self._stop.set()

def stopped(self):
    return self._stop.isSet()

Hành vi liên quan đến hàng đợi:

class Queueable():
    def __init__(self):
        self._queue = Queue()

    def append_to_job_queue(self, job):
        self._queue.put(job)

Kết hợp hai ở trên và thêm hàng đợi.join () vào cuộc gọi stop ()

class StoppableQueueThread(StoppableThread, Queueable):
    def __init__(self):
        StoppableThread.__init__(self)
        Queueable.__init__(self)

    def stop(self):
        super(StoppableQueueThread, self).stop()
        self._queue.join()

Một lớp cơ sở cho một bộ dữ liệu:

class DataSource(StoppableThread, ABC):

    def __init__(self, data_parser):
        StoppableThread.__init__(self)
        self.setName("DataSource")
        ABC.__init__(self)
        self._data_parser = data_parser

    def run(self):
        while not self.stopped():
            record = self._fetch_data()
            self._data_parser.append_to_job_queue(record)

    @abstractmethod
    def _fetch_data(self):
        """implement logic here for obtaining a data piece
            should return the fetched data"""

Một triển khai cho một bộ dữ liệu:

class CSVDataSource(DataSource):
    def __init__(self, data_parser, file_path):
        DataSource.__init__(self, data_parser)
        self.file_path = file_path
        self.csv_data = Queue()
        print('loading csv')
        self.load_csv()
        print('done loading csv')

    def load_csv(self):
        """Loops through csv and adds data to a queue"""
        with open(self.file_path, 'r') as f:

            self.reader = reader(f)
            next(self.reader, None)  # skip header
            for row in self.reader:
                self.csv_data.put(row)

    def _fetch_data(self):
        """Returns next item of the queue"""
        item = self.csv_data.get()
        self.csv_data.task_done()
        print(self.csv_data.qsize())
        return item

Giả sử có một trường hợp

class Queueable():
    def __init__(self):
        self._queue = Queue()

    def append_to_job_queue(self, job):
        self._queue.put(job)
6 được gọi là
class Queueable():
    def __init__(self):
        self._queue = Queue()

    def append_to_job_queue(self, job):
        self._queue.put(job)
7, nếu tôi muốn dừng chuỗi tôi gọi:

ds.stop()
ds.join()

Cuộc gọi

class Queueable():
    def __init__(self):
        self._queue = Queue()

    def append_to_job_queue(self, job):
        self._queue.put(job)
8 Tuy nhiên, không bao giờ trả lại. Tôi không chắc tại sao điều này là, bởi vì phương thức
class Queueable():
    def __init__(self):
        self._queue = Queue()

    def append_to_job_queue(self, job):
        self._queue.put(job)
9 không kiểm tra xem sự kiện dừng có được đặt không.

Bất kỳ ý tưởng?

Cập nhật

Rõ ràng hơn một chút theo yêu cầu: Các ứng dụng được xây dựng từ một số luồng. Chủ đề RealStrargety (bên dưới) là chủ sở hữu của tất cả các chủ đề khác và chịu trách nhiệm bắt đầu và chấm dứt chúng. Tôi đã không đặt cờ daemon cho bất kỳ luồng nào, vì vậy chúng nên không theo mặc định.

Chủ đề chính trông như thế này:

if __name__ == '__main__':
    def exit_handler(signal, frame):
        rs.stop_engine()
        rs.join()
        sys.exit(0)

    signal.signal(signal.SIGINT, exit_handler)



    rs = RealStrategy()
    rs.run_engine()

Và đây là các phương pháp

class StoppableQueueThread(StoppableThread, Queueable):
    def __init__(self):
        StoppableThread.__init__(self)
        Queueable.__init__(self)

    def stop(self):
        super(StoppableQueueThread, self).stop()
        self._queue.join()
0 và
class StoppableQueueThread(StoppableThread, Queueable):
    def __init__(self):
        StoppableThread.__init__(self)
        Queueable.__init__(self)

    def stop(self):
        super(StoppableQueueThread, self).stop()
        self._queue.join()
1 được gọi trong chính:main:

class RealStrategy(Thread):
.....
.....
    def run_engine(self):
        self.on_start()
        self._order_handler.start()
        self._data_parser.start()
        self._data_source.start()
        self.start()

    def stop_engine(self):
        self._data_source.stop()
        self._data_parser.stop()
        self._order_handler.stop()

        self._data_source.join()
        self._data_parser.join()
        self._order_handler.join()

        self.stop()

Bạn dùng máy tính hàng ngày, mở hàng chục trang web khác nhau, cùng một cơ số đếm không xuể các ứng dụng nghe nhạc, xem phim, game ở ngoài, bạn có tự hỏi vì sao máy tính có thể cân hết chừng đấy việc một lúc không? Dường như các chương trình đều phản ứng chỉ trong tích tắc, và đang chạy đồng thời cùng nhau. Nhưng thực tế ảo diệu hơn thế nhiều, hóa ra, trong một đơn vị thời gian (nanosecond), chỉ có một chương trình (process) được chạy. Và trong chương trình đó, lại chia ra thành nhiều luồn (thread) con, thực thi cùng một lúc (multithread, ít nhất là trong thời điểm hiện tại), tạo cho người dùng cảm giác chương trình đang chạy nhanh hơn. Nhờ khả năng xử lí các task có thể coi như đồng thời (concurrency), chương trình có thể đáp ứng tốt với người dùng trong khi đang bận làm việc khác. Và đó là chính ý tưởng cơ bản của multithread.

Nội dung chính

  • 1.Luồng (thread) là gì ? Sự khác nhau giữa thread và process
  • 2. Đa luồng (Multithreading) là gì:
  • 3. Đồng bộ hóa các Thread trong Python
  • 4. Ứng dụng đa luồng

1.Luồng (thread) là gì ? Sự khác nhau giữa thread và process

  • 2. Đa luồng (Multithreading) là gì:
  • 3. Đồng bộ hóa các Thread trong Python
  • 4. Ứng dụng đa luồng
  • Nói về cấu trúc máy tính : Thread là một đơn vị cơ bản trong CPU. Một luồng sẽ chia sẻ với các luồng khác trong cùng process về thông tin data, các dữ liệu của mình. Việc tạo ra thread giúp cho các chương trình có thể chạy được nhiều công việc cùng một lúc
  • Process là quá trình hoạt động của một ứng dụng. Tiến trình (process)chứa đựng thông tin tài nguyên, trạng thái thực hiện của chương trình

Thread là một bước điều hành bên trong một process. Luồng (thread) là một khối các câu lệnh (instructions) độc lập trong một tiến trình và có thể được lập lịch bởi hệ điều hành. Hay nói một cách đơn giản, Thread là các hàm hay thủ tục chạy độc lập đối với chương trình chính. Một process dĩ nhiên có thể chứa nhiều thread bên trong nó. Điểm quan trọng nhất cần chú ý là một thread có thể làm bất cứ nhiệm vụ gì một process có thể làm.

2. Đa luồng (Multithreading) là gì:

3. Đồng bộ hóa các Thread trong Python

4. Ứng dụng đa luồng

Nói về cấu trúc máy tính : Thread là một đơn vị cơ bản trong CPU. Một luồng sẽ chia sẻ với các luồng khác trong cùng process về thông tin data, các dữ liệu của mình. Việc tạo ra thread giúp cho các chương trình có thể chạy được nhiều công việc cùng một lúc

import time

def cal_square(numbers):
	print("calculate square number")
	for n in numbers:
		time.sleep(0.2)
		print ('square:', n*n)

def cal_cube(numbers):
	print("calculate cube number")
	for n in numbers:
		time.sleep(0.2)
		print ('square:', n*n*n)

arr = [2,3,7,9]
t = time.time()
cal_square(arr)
cal_cube(arr)
print ("done in ", time.time()- t)

Process là quá trình hoạt động của một ứng dụng. Tiến trình (process)chứa đựng thông tin tài nguyên, trạng thái thực hiện của chương trình

from threading import Thread
import threading
import time


def cal_square(numbers):
	print("calculate square number")
	for n in numbers:
		time.sleep(0.2)
		print ('square:', n*n)


def cal_cube(numbers):
	print("calculate cube number \n")
	for n in numbers:
		time.sleep(0.2)
		print ('cube:', n*n*n)

arr = [2,3,7,9]

try:
	t = time.time()
	t1 = threading.Thread(target=cal_square, args=(arr,))
	t2 = threading.Thread(target=cal_cube, args=(arr,))
	t1.start()
	t2.start()
	t1.join()
	t2.join()
	print ("done in ", time.time()- t)
except:
	print ("error")

Thread là một bước điều hành bên trong một process. Luồng (thread) là một khối các câu lệnh (instructions) độc lập trong một tiến trình và có thể được lập lịch bởi hệ điều hành. Hay nói một cách đơn giản, Thread là các hàm hay thủ tục chạy độc lập đối với chương trình chính. Một process dĩ nhiên có thể chứa nhiều thread bên trong nó. Điểm quan trọng nhất cần chú ý là một thread có thể làm bất cứ nhiệm vụ gì một process có thể làm.

class Queueable():
    def __init__(self):
        self._queue = Queue()

    def append_to_job_queue(self, job):
        self._queue.put(job)
0

Một điểm khác biệt nữa đó là nhiều thread nằm trong cùng một process dùng một không gian bộ nhớ giống nhau, trong khi process thì không. Điều này cho phép các thread đọc và viết cùng một kiểu cấu trúc và dữ liệu, giao tiếp dễ dàng giữa các thread với nhau. Giao thức giữa các process, hay còn gọi là IPC (inter-process communication) thì tương đối phức tạp bởi các dữ liệu có tính tập trung sâu hơn. Ngoài các tài nguyên riêng của mình (các biến cục bộ trong hàm), các luồng chia sẻ tài nguyên chung của tiến trình. Việc thay đổi tài nguyên chung (ví dụ, đóng file, gán giá trị mới cho biến) từ một thread sẽ được nhìn thấy bởi tất cả các thread khác. Vì vậy, lập trình viên cần phải thực hiện đồng bộ việc truy cập tài nguyên chung giữa các luồng.

Hình bên dưới minh họa sự khác nhau giữa luồng và tiến trình. threading Module thì nó có nhiều điểm hạn chế. Phần tiếp theo giới thiệu về threading Module.

Đây là nhưng kiến thức trung xuất phát từ máy tính nói chung, đến các ngôn ngữ lập trình nói riêng thì những khái niệm này cũng tương tự như vậy

  • Một chương trình đa luồng chứa hai hoặc nhiều phần mà có thể chạy đồng thời và mỗi phần có thể xử lý tác vụ khác nhau tại cùng một thời điểm, để sử dụng tốt nhất các nguồn có sẵn, đặc biệt khi máy tính của bạn có nhiều CPU. Trả về số đối tượng thread mà là active.
  • Python cung cấp thread Module và threading Module để bạn có thể bắt đầu một thread mới cũng như một số tác vụ khác trong khi lập trình đa luồng. Mỗi một Thread đều có vòng đời chung là bắt đầu, chạy và kết thúc. Một Thread có thể bị ngắt (interrupt), hoặc tạm thời bị dừng (sleeping) trong khi các Thread khác đang chạy – được gọi là yielding. Trả về số đối tượng thread trong Thread control của Caller.
  • Một ví dụ đơn giản chỉ sử dụng một thread, có truyền tham số. Để chạy thread, ta dùng method start(). Target sẽ là function myThread, là nhiệm vụ mà thread phải hoàn thàDư. Đây là một chương trình process chạy bình thường: Trả về một danh sách tất cả đối tượng thread mà hiện tại là active.

Còn đây là chương trình chạy đa luồng

  • có thể thấy các luồng chạy đồng thời song song với nhau, không cần chạy lần lượt tuần tự như process nữa , với luồng 1 chạy in ra với độ delay là 2s, và với luồng 2 là 3s. Nếu chạy process thì tài nguyên có thể khác nhau,cấu trúc khác nhau, kết quả khác nhau và hoạt động tuần tự , còn đa luồng thi các thread có thể cấu trúc giống giau , tài nguyên dùng ít hơn. Là entry point cho một Thread.
  • kết quả Bắt đầu một thread bởi gọi phương thức run().
  • Mặc dù thread Module rất hiệu quả với đa luồng tầm thấp nhưng khi so sánh với threading Module thì nó có nhiều điểm hạn chế. Phần tiếp theo giới thiệu về threading Module. Đợi cho các thread kết thúc.
  • Module Threading cung cấp nhiều hỗ trợ mạnh mẽ và cấp độ cao hơn cho các Thread trong khi so sánh với thread Module ở trên. Ngoài các phương thức có trong thread Module, thì threading Module còn bổ sung thêm một số phương thức khác, đó là: Kiểm tra xem một thread có đang thực thi hay không.
  • threading.activeCount(): Trả về số đối tượng thread mà là active. Trả về tên của một thread.
  • threading.currentThread(): Trả về số đối tượng thread trong Thread control của Caller. Thiết lập tên của một thread.

threading.enumerate(): Trả về một danh sách tất cả đối tượng thread mà hiện tại là active.

class Queueable():
    def __init__(self):
        self._queue = Queue()

    def append_to_job_queue(self, job):
        self._queue.put(job)
1

3. Đồng bộ hóa các Thread trong Python

Trong lập trình đa luồng, các threads chia sẻ chung tài nguyên của tiến trình, vì vậy có những thời điểm nhiều luồng sẽ đồng thời thay đổi dữ liệu chung. Do đó, ta cần những cơ chể để đảm bảo rằng, tại một thời điểm chỉ có duy nhất một luồng được phép truy cập vào dữ liệu chung, nếu các luồng khác muốn truy cập vào đoạn dữ liệu này thì cần phải đợi cho thread trước đó hoàn thành công việc của mình.

Python cung cấp threading Module, mà bao gồm một kỹ thuật locking cho phép bạn đồng bộ hóa các Thread một cách dễ dàng. Một lock mới được tạo bởi gọi phương thức Lock().

Phương thức acquire(blocking) của đối tượng lock mới này được sử dụng để ép các Thread chạy một cách đồng bộ. Tham số blocking tùy ý cho bạn khả năng điều khiển để xem một Thread có cần đợi để đạt được lock hay không.

Nếu tham số blocking được thiết lập là 0, tức là Thread ngay lập tức trả về một giá trị 0 nếu không thu được lock và trả về giá trị 1 nếu thu được lock. Nếu blocking được thiết lập là 1, thì Thread cần đợi cho đến khi lock được giải phóng.

Phương thức release() của đối tượng lock được sử dụng để giải phóng lock khi nó không cần nữa.

4. Ứng dụng đa luồng

Đa luồng có rất công dụng vô cùng hữu ích thích hợp cho những tác vụ chạy ngầm không cần quan tâm chính xác thời gian hoàn thành, nghe có vẻ giống cronjob nhỉ =)) nhưng đặc điểm lớn nhất của nó vẫn là chạy song song nhiều luồng cùng 1 lúc , có thể kể đến tác dụng hữu hiệu nhất trong các ứng dụng web ,và dự án của bản thân là ghi log. Ví dụ dự án của mình có tác vụ gồm nhiều ưu đãi , người dùng muốn lưu ưu đãi đó cho dùng lần sau thì cần ghi log 2 sự kiện gồm view log và save log

class Queueable():
    def __init__(self):
        self._queue = Queue()

    def append_to_job_queue(self, job):
        self._queue.put(job)
2

với

class Queueable():
    def __init__(self):
        self._queue = Queue()

    def append_to_job_queue(self, job):
        self._queue.put(job)
3

LogUserActionThearding chính là hàm khởi tạo thread. Nào cùng chạy => run()

class Queueable():
    def __init__(self):
        self._queue = Queue()

    def append_to_job_queue(self, job):
        self._queue.put(job)
4