Hàng đợi đa xử lý Python

Một thẻ đã tồn tại với tên chi nhánh được cung cấp. Nhiều lệnh Git chấp nhận cả tên thẻ và tên nhánh, vì vậy việc tạo nhánh này có thể gây ra hành vi không mong muốn. Bạn có chắc chắn muốn tạo nhánh này không?

nhập đa xử lý

thời gian nhập khẩu

# Nhà sản xuất / Nhà văn

def procFunction0[messageQueue]

cho tôi trong phạm vi [1,10]

Hàng đợi tin nhắn. put["Con1. Tin nhắn %d"%i]

thời gian. ngủ[1]

# Người tiêu dùng/Bạn đọc

def procFunction1[messageQueue]

trong khi messageQueue. trống[] là Sai

print["Từ người đọc. %s"%messageQueue. được[]]

thời gian. ngủ[1]

# Nhà sản xuất / Nhà văn

def procFunction2[messageQueue]

cho tôi trong phạm vi [1,10]

Hàng đợi tin nhắn. đặt ["Con3. Tin nhắn %d"%i]

thời gian. ngủ[1]

nếu __name__ == "__main__"

đa xử lý. set_start_method["ngã ba"]

messageQueue  = đa xử lý. Xếp hàng[]

# Tạo tiến trình con

childProcess0 = đa xử lý. Quá trình [đích=procFunction0, args=[messageQueue,]]

childProcess1 = đa xử lý. Quá trình [đích=procFunction1, args=[messageQueue,]]

childProcess2 = đa xử lý. Quá trình [đích=procFunction2, args=[messageQueue,]]

# Bắt đầu tất cả các tiến trình con - Writer, Reader, Writer

quy trình con0. bắt đầu[]

quy trình con1. bắt đầu[]

quy trình con2. bắt đầu[]

# Đợi các tiến trình con kết thúc

quy trình con0. tham gia[]

quy trình con1. tham gia[]

quy trình con2. tham gia[]

“Một số người, khi đối mặt với một vấn đề, nghĩ rằng 'Tôi biết, tôi sẽ sử dụng đa luồng'. Nothhw tpe yawrve o oblems. ” [Eiríkr Åsheim, 2012]

Tuy nhiên, nếu đa luồng là một vấn đề, thì làm cách nào để tận dụng các hệ thống có 8, 16, 32 và thậm chí hàng nghìn CPU riêng biệt?

Một giải pháp tuyệt vời là sử dụng đa xử lý, thay vì đa luồng, trong đó công việc được chia thành các quy trình riêng biệt, cho phép hệ điều hành quản lý quyền truy cập vào các tài nguyên được chia sẻ. Điều này cũng xoay quanh một trong những gót chân Achilles khét tiếng trong Python. Khóa phiên dịch viên toàn cầu [còn gọi là theGIL]. Khóa này hạn chế tất cả mã Python chỉ chạy trên một bộ xử lý tại một thời điểm để các ứng dụng đa luồng Python phần lớn chỉ hữu ích khi có nhiều thời gian chờ IO. Nếu ứng dụng của bạn bị ràng buộc I/O và không yêu cầu nhiều khối thời gian CPU, thì kể từ Python phiên bản 3. 4, hệ thống asyncio là phương pháp ưa thích

Python cung cấp mô-đun đa xử lý cung cấp một số hàm và lớp hữu ích để quản lý các quy trình con và giao tiếp giữa chúng. Một giao diện mà mô-đun cung cấp là quy trình làm việc của

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
1 và
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
2, cho phép một người lấy một tập dữ liệu lớn có thể được chia thành các phần sau đó được ánh xạ tới một chức năng duy nhất. Điều này cực kỳ đơn giản và hiệu quả để thực hiện chuyển đổi, phân tích hoặc các tình huống khác khi cùng một thao tác sẽ được áp dụng cho từng dữ liệu. Nhưng
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
1 và
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
2 không phù hợp với các tình huống cần duy trì trạng thái theo thời gian hoặc đặc biệt là các tình huống có hai hoặc nhiều hoạt động khác nhau cần chạy và tương tác với nhau theo một cách nào đó. Đối với các loại vấn đề này, người ta cần sử dụng các tính năng phức tạp hơn một chút của mô-đun đa xử lý, chẳng hạn như
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
5,
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
6 và
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
7. Việc sử dụng các tính năng này gây ra một số vấn đề phức tạp hiện cần được quản lý, đặc biệt là liên quan đến việc bắt đầu và dừng các quy trình con một cách rõ ràng, cũng như phối hợp giữa chúng

Thêm ngữ cảnh

Vì đa xử lý Python là tốt nhất cho các vấn đề phức tạp, nên chúng tôi sẽ thảo luận về các mẹo này bằng cách sử dụng một ví dụ phác thảo mô phỏng thiết bị giám sát IoT. Ví dụ này dựa trên việc triển khai hệ thống HVAC mà tôi đã thực hiện vào năm 2018

Ứng dụng này bao gồm một “Quy trình chính” - quản lý việc khởi tạo, tắt máy và xử lý vòng lặp sự kiện - và bốn quy trình con

  • “Quy trình con quan sát” chạy cứ sau 10 giây và thu thập dữ liệu về các hệ thống HVAC đang được giám sát, xếp hàng thông báo sự kiện “Quan sát” vào Hàng đợi sự kiện. Trong ví dụ này, giả định rằng “Quy trình con quan sát” sử dụng nhiều CPU vì nó thực hiện một lượng lớn xử lý dữ liệu trên quan sát trước khi gửi nó đến máy chủ
  • “Quy trình con trạng thái” cũng chạy 10 giây một lần và thu thập dữ liệu về hệ thống IoT, bao gồm những thứ như thời gian hoạt động, trạng thái và cài đặt mạng cũng như mức sử dụng bộ nhớ. Nó cũng xếp hàng một thông báo sự kiện “Trạng thái” vào Hàng đợi sự kiện
  • “Gửi quy trình con” chấp nhận các thông báo sự kiện “Gửi” từ Hàng đợi gửi, sau đó gửi các thông báo đó qua mạng đến một máy chủ trung tâm sẽ lưu trữ, phân tích, trình bày và tự động hành động dựa trên dữ liệu mà nó nhận được,
  • “Listen Sub process” lắng nghe trên một cổng mạng đối với các yêu cầu đến từ máy chủ chỉ huy trung tâm, xếp hàng đợi thông báo sự kiện “Command” vào Hàng đợi sự kiện, sau đó đợi thông báo “Reply” từ Main Process. Các lệnh như vậy chủ yếu tập trung vào việc quản lý hệ thống HVAC và thu thập dữ liệu vận hành hệ thống IoT chi tiết. Lưu ý rằng, trong thế giới thực, giao diện này phức tạp hơn nhiều và dựa trên lớp bảo mật
  • “Quy trình chính”, bên cạnh việc quản lý khởi động và tắt toàn bộ hệ thống, còn quản lý Hàng đợi sự kiện, định tuyến các thông báo đến quy trình con thích hợp. Các thông báo "Trạng thái" và "Quan sát" dẫn đến một thông báo "Gửi" đến Hàng đợi Gửi và các sự kiện "Lệnh" được xử lý và sau đó một thông báo "Trả lời" được đưa vào hàng đợi Trả lời

Lưu ý về ví dụ mã. Những ví dụ này sử dụng mptools{. } Thư viện Python mà tôi đã phát triển khi viết bài đăng trên blog này. Đây là ví dụ duy nhất mà giao diện thư viện được tham chiếu trực tiếp

def main[]:
    with MainContext[] as main_ctx:
        init_signals[main_ctx.shutdown_event, default_signal_handler, default_signal_handler]

        send_q = main_ctx.MPQueue[]
        reply_q = main_ctx.MPQueue[]

        main_ctx.Proc["SEND", SendWorker, send_q]
        main_ctx.Proc["LISTEN", ListenWorker, reply_q]
        main_ctx.Proc["STATUS", StatusWorker]
        main_ctx.Proc["OBSERVATION", ObservationWorker]

        while not main_ctx.shutdown_event.is_set[]:
            event = main_ctx.event_queue.safe_get[]
            if not event:
                continue
            elif event.msg_type == "STATUS":
                send_q.put[event]
            elif event.msg_type == "OBSERVATION":
                send_q.put[event]
            elif event.msg_type == "ERROR":
                send_q.put[event]
            elif event.msg_type == "REQUEST":
                request_handler[event, reply_q, main_ctx]
            elif event.msg_type == "FATAL":
                main_ctx.log[logging.INFO, f"Fatal Event received: {event.msg}"]
                break
            elif event.msg_type == "END":
               main_ctx.log[logging.INFO, f"Shutdown Event received: {event.msg}"]
               break
           else:
               main_ctx.log[logging.ERROR, f"Unknown Event: {event}"]

Mẹo số 1

Không chia sẻ tài nguyên, chuyển tin nhắn

Thoạt nghĩ, có vẻ như là một ý tưởng hay khi có một số loại cấu trúc dữ liệu dùng chung sẽ được bảo vệ bằng khóa. Khi chỉ có một cấu trúc được chia sẻ, bạn có thể dễ dàng gặp sự cố với việc chặn và tranh chấp. Tuy nhiên, khi các cấu trúc như vậy sinh sôi nảy nở, độ phức tạp và các tương tác không mong muốn sẽ nhân lên, có khả năng dẫn đến bế tắc và rất có khả năng dẫn đến mã khó bảo trì và kiểm tra. Tùy chọn tốt hơn là chuyển tin nhắn bằng cách sử dụng đối tượng

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
8. Hàng đợi nên được sử dụng để chuyển tất cả dữ liệu giữa các quy trình con. Điều này dẫn đến các thiết kế “chia nhỏ” dữ liệu thành các thông báo được chuyển và xử lý, để các quy trình con có thể được tách biệt hơn và định hướng theo chức năng/nhiệm vụ. Lớp Python
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
6 được triển khai trên các hệ thống giống như unix dưới dạng PIPE - nơi dữ liệu được gửi đến hàng đợi được tuần tự hóa bằng cách sử dụng mô-đun
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
00 của thư viện chuẩn Python. Hàng đợi thường được khởi tạo bởi quy trình chính và được chuyển đến quy trình con như một phần của quá trình khởi tạo

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]

Mẹo số 2

Luôn dọn dẹp sau khi chính mình

Các quy trình con có thể bị treo hoặc không tắt hoàn toàn, có khả năng khiến một số tài nguyên hệ thống không khả dụng và tệ hơn có thể khiến một số thông báo chưa được xử lý. Vì lý do này, một tỷ lệ đáng kể mã của một người cần được dành cho việc dừng các quy trình con một cách rõ ràng

Phần đầu tiên của vấn đề này là yêu cầu các quy trình con dừng lại. Vì chúng tôi đang sử dụng Hàng đợi và tin nhắn, nên trường hợp đầu tiên và phổ biến nhất là sử dụng tin nhắn “KẾT THÚC”. Khi đến lúc dừng ứng dụng, một người xếp hàng các thông báo “END” vào mỗi hàng đợi trong hệ thống, bằng số lượng quy trình con đọc từ Hàng đợi đó. Mỗi quy trình con sẽ lặp lại các thông báo từ hàng đợi và sau khi nhận được thông báo “KẾT THÚC”, nó sẽ thoát ra khỏi vòng lặp và tự tắt hoàn toàn

Thông báo “KẾT THÚC” rất hữu ích, nhưng chúng không phải là phương pháp duy nhất khả dụng và chúng không hữu ích nếu quy trình con không đọc từ hàng đợi. Tôi khuyên bạn nên sử dụng đối tượng

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
01. Một đối tượng
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
7 là một cờ
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
03/
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
04, được khởi tạo là
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
04, có thể được đặt thành
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
03 một cách an toàn trong môi trường nhiều quy trình trong khi các quy trình khác có thể kiểm tra nó bằng
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
07 và đợi nó thay đổi thành
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
03. Thực tiễn tốt nhất là chỉ có một đối tượng Sự kiện "yêu cầu tắt máy" trong một ứng dụng được chuyển cho tất cả các quy trình con. Sau đó, các quy trình con sẽ lặp lại bằng cách sử dụng
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
7 đó làm kiểm tra boolean của chúng - để nếu
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
10
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
7 được đặt thành
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
03, vòng lặp sẽ kết thúc

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
0

Khi một quy trình con cần kết thúc - có thể là thông qua thông báo “END”, cờ Sự kiện

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
13 hoặc một ngoại lệ nào đó - nhiệm vụ của quy trình con là tự dọn dẹp sau đó bằng cách giải phóng mọi tài nguyên mà nó sở hữu. Python thực hiện khá tốt việc dọn dẹp mọi thứ trong quá trình thu gom rác, nhưng một số tài nguyên cần phải được đóng sạch [đường ống, tệp] và một số tài nguyên sẽ bị treo trong một khoảng thời gian chờ không xác định, do đó ngăn quá trình thoát sạch. Tài nguyên mạng không chỉ có thể liên kết tài nguyên cục bộ mà còn có thể liên kết tài nguyên trên hệ thống máy chủ từ xa trong khi chờ hết thời gian chờ. Vì vậy, dọn dẹp lần cuối là rất quan trọng

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
1

Các quy trình con không chỉ cần tự dọn dẹp mà quy trình chính còn cần dọn dẹp các quy trình con, đối tượng

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
6 và các tài nguyên khác mà nó có thể có quyền kiểm soát. Dọn dẹp các quy trình con liên quan đến việc đảm bảo rằng mỗi quy trình con nhận được bất kỳ thông báo chấm dứt nào mà nó có thể cần và rằng các quy trình con đó thực sự bị chấm dứt. Mặt khác, việc dừng tiến trình chính có thể dẫn đến treo hoặc tiến trình con zombie mồ côi. Quy trình thông thường liên quan đến việc đặt cờ tắt máy, đợi tất cả các quy trình dừng bình thường trong một khoảng thời gian hợp lý, sau đó chấm dứt bất kỳ quy trình nào chưa dừng

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
8

Các đối tượng Queue của Python cũng cần một chút xử lý đặc biệt để được dọn sạch hoàn toàn. chúng cần được loại bỏ bất kỳ mục nào còn sót lại ở đó [và những mục đó có thể hoặc không cần được xử lý bằng cách nào đó - có thể bằng cách lưu chúng vào đĩa], đóng cửa và quan trọng là phải gọi

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
15 để giám sát liên quan

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
0

Mẹo số 3

Luôn xử lý các tín hiệu TERM và INT

Chúng ta đã thảo luận về cách đảm bảo kết thúc các quy trình con, nhưng làm cách nào để xác định thời điểm kết thúc chúng? . Ngoài ra, đặc biệt là trong quá trình thử nghiệm, người ta thường thấy mình sử dụng tín hiệu INT [hay còn gọi là

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
16] để dừng thử nghiệm chạy trốn. Thông thường, người ta mong muốn hành vi tương tự từ các tín hiệu TERM và INT, mặc dù INT cũng có thể luôn muốn tạo dấu vết ngăn xếp để người dùng có thể thấy chính xác hơn những gì họ đã ngắt

Cách tiếp cận mà tôi khuyên dùng là yêu cầu các trình xử lý tín hiệu đặt cờ Sự kiện

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
13 trong hai lần đầu tiên chúng được gọi và sau đó đưa ra một ngoại lệ mỗi lần chúng được gọi sau đó. Điều này cho phép một người nhấn control-C hai lần và chắc chắn dừng mã đang chờ đợi hoặc trong một vòng lặp, đồng thời cho phép xử lý tắt máy "bình thường" để dọn sạch đúng cách. Ví dụ dưới đây sử dụng chức năng xử lý tín hiệu phổ biến, sử dụng funcools. một phần để tạo hai chức năng, chỉ khác nhau ở chỗ chúng sẽ tăng ngoại lệ nào, được thông qua dưới dạng trình xử lý tín hiệu. Một chi tiết quan trọng là các tín hiệu cần được thiết lập riêng cho từng quy trình con. Các hệ thống Linux/Unix tự động truyền tín hiệu tới tất cả các quy trình con, vì vậy các quy trình con đó cũng cần nắm bắt và xử lý các tín hiệu đó. Một lợi thế của điều này là các trình xử lý tín hiệu của quy trình con có khả năng hoạt động trên các tài nguyên dành riêng cho quy trình con đó. Ví dụ: tôi đã viết một trình xử lý tín hiệu đã thay đổi ổ cắm chặn ZeroMQ thành ổ cắm không chặn. Điều này cho phép tôi viết mã cho cuộc gọi
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
18 không có thời gian chờ và không thực sự cần thời gian chờ

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
4

Mẹo số 4

Đừng Đợi Mãi

Hành vi tắt máy thích hợp yêu cầu mọi quy trình trong hệ thống phải có khả năng phục hồi để không bị “mắc kẹt”. Điều này có nghĩa là không chỉ các vòng lặp có điều kiện kết thúc mà các cuộc gọi hệ thống khác có thể chặn và chờ sẽ cần sử dụng thời gian chờ nếu có thể. Các đối tượng

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
6 cho phép hết thời gian chờ khi thực hiện cả lệnh gọi
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
80 và
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
18, ổ cắm có thể được định cấu hình để hết thời gian chờ, v.v. Điều này cuối cùng trở thành một hình thức bỏ phiếu trong đó có 2 sự kiện đang được kiểm tra. cuộc gọi hệ thống và sự kiện kết thúc [i. e.
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
13 là
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
03]. Bạn sẽ cần xác định xem mình có thể đợi bao lâu trong cuộc gọi hệ thống trước khi kiểm tra xem vòng lặp có cần chấm dứt hay không. Mục đích là để kiểm tra việc chấm dứt đủ thường xuyên để hệ thống sẽ phản hồi kịp thời yêu cầu chấm dứt/tắt máy, trong khi dành phần lớn thời gian của quy trình để chờ đợi tài nguyên [hàng đợi, sự kiện, ổ cắm, v.v.]. Điều quan trọng là không phải đợi quá lâu vì

Dưới đây là một số ví dụ về chờ đợi

Bỏ phiếu chống lại hàng đợi.

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
18 từ
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
6 với khối được đặt thành
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
03 và thời gian chờ ngắn. Nếu
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
87 được nâng lên, hãy quay lại đầu vòng lặp và thử lại, nếu không, hãy xử lý mục được trả lại

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
4

Bỏ phiếu chống lại ổ cắm. Gọi

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
88 trên ổ cắm với thời gian chờ ngắn. Nếu
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
89 được nâng lên, hãy quay lại đầu vòng lặp, kiểm tra
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
13 và thử lại, nếu không, xử lý xử lý kết nối máy khách được chấp nhận [cũng sẽ cần phải gọi
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
88 trên đó, để hoạt động của nó không bị treo

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
9

Thăm dò ý kiến ​​trong khi chờ hẹn giờ. Vòng lặp miễn là

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
13 không được đặt, kích hoạt bộ đếm thời gian cứ sau 303 giây. Mỗi lần đi qua vòng lặp sẽ ngủ trong thời gian còn lại cho đến khi
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
04, tối đa là
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
05 [0. 02] giây [tất nhiên, có nghĩa là nó thường ngủ 0. 02 giây]. Nếu mã xuất hiện từ
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
06 trước
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
04, hãy quay lại đầu vòng lặp và thử lại, nếu không thì hãy làm gì đó [trong trường hợp này, hãy đặt “HẸN GIỜ SỰ KIỆN” trên
event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
08] và tính lại thời gian cho lần tiếp theo

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
7

Mẹo số 5

Báo cáo các trường hợp ngoại lệ và có hệ thống ghi nhật ký được chia sẻ dựa trên thời gian

Việc đăng nhập vào một ứng dụng là cực kỳ quan trọng và thậm chí còn quan trọng hơn thế trong một ứng dụng đa xử lý, trong đó nhật ký kết hợp tỏa sáng khi báo cáo các sự kiện theo thứ tự dựa trên thời gian. Đáng mừng là Python cung cấp các phương tiện ghi nhật ký tốt. Đáng buồn thay, Python không thực sự cung cấp một cách tuyệt vời để đồng bộ hóa thông điệp nhật ký quy trình con. Vì có rất nhiều phần chuyển động nên mỗi thông báo nhật ký cần 2 phần dữ liệu chính. quá trình nào đang tạo thông báo tường trình và đã bao lâu kể từ khi ứng dụng bắt đầu. Tôi thường đặt tên cho các quy trình của mình. Nếu có nhiều bản sao của cùng một quy trình, thì chúng sẽ có tên là “WORKER-1”, “WORKER-2”, v.v.

Bên cạnh việc ghi nhật ký, mỗi quy trình con có thể gửi thông báo Lỗi và Tắt máy đến hàng đợi sự kiện chính. Cho phép trình xử lý sự kiện nhận biết và xử lý các sự kiện không mong muốn, chẳng hạn như thử gửi lại không thành công hoặc bắt đầu một quy trình con mới sau khi một quy trình không thành công

Dưới đây là đầu ra nhật ký từ một lần chạy mẫu. Lưu ý cách thời gian dựa trên thời gian bắt đầu ứng dụng cung cấp một bức tranh rõ ràng hơn về những gì đang diễn ra trong tất cả quá trình khởi động quan trọng

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
09

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
0

Phần kết luận

Mô-đun

event_q = multiprocessing.Queue[]
send_q = multiprocessing.Queue[]
# ...
event_q.put[FOO]

# … in another subprocess ...
event = event_q.get[block=True, timeout=timeout]

# …
queue.close[]
queue.join_thread[]
40 của Python cho phép bạn tận dụng sức mạnh CPU có sẵn trên các hệ thống hiện đại, nhưng việc viết và duy trì các ứng dụng đa xử lý mạnh mẽ đòi hỏi phải tránh một số mẫu nhất định có thể dẫn đến những khó khăn không mong muốn, đồng thời dành nhiều thời gian và năng lượng để tập trung vào các chi tiết không . Vì những lý do này, việc quyết định sử dụng tính năng đa xử lý trong một ứng dụng không phải là điều gì đó dễ dàng, nhưng khi bạn làm như vậy, những mẹo này sẽ giúp công việc của bạn diễn ra suôn sẻ hơn và sẽ cho phép bạn tập trung vào các vấn đề cốt lõi của mình

Hàng đợi đa xử lý trong Python là gì?

Lớp hàng đợi đa xử lý của Python . Chúng có thể lưu trữ bất kỳ đối tượng pickle Python nào [mặc dù đối tượng đơn giản là tốt nhất] và cực kỳ hữu ích để chia sẻ dữ liệu giữa các quy trình. Python Multiprocessing modules provides Queue class that is exactly a First-In-First-Out data structure. They can store any pickle Python object [though simple ones are best] and are extremely useful for sharing data between processes.

Hàng đợi trong đa xử lý là gì?

Python cung cấp hàng đợi an toàn cho quy trình trong đa xử lý. lớp xếp hàng. Hàng đợi là cấu trúc dữ liệu mà các mục có thể được thêm vào bằng lệnh gọi put[] và từ đó các mục có thể được truy xuất bằng lệnh gọi get[]. The multiprocessing.

Là chuỗi hàng đợi đa xử lý

Điều này bao gồm hàng đợi trong đa xử lý. Hàng đợi là luồng và xử lý an toàn . Điều này có nghĩa là các quy trình có thể nhận [] và đặt [] các mục từ và vào hàng đợi đồng thời mà không sợ điều kiện chạy đua.

Hàng đợi Python có chậm không?

hàng đợi đặt và nhận phần tử để truyền dữ liệu giữa các quy trình python quá chậm . Nhưng nếu bạn đặt hoặc lấy một danh sách có các phần tử hoạt động tương tự như đặt hoặc lấy một phần tử; .

Chủ Đề