Làm cách nào để chia sẻ dữ liệu giữa hai quy trình trong python?

Tại Repustate, nhiều mô hình dữ liệu mà chúng tôi sử dụng trong Phân tích văn bản có thể được biểu diễn dưới dạng các cặp khóa-giá trị đơn giản hoặc từ điển trong biệt ngữ Python. Trong trường hợp cụ thể của chúng tôi, các từ điển của chúng tôi rất lớn, vài trăm MB mỗi từ điển và chúng cần được truy cập liên tục. Trên thực tế, đối với một yêu cầu HTTP nhất định, có thể truy cập 4 hoặc 5 mô hình, mỗi mô hình thực hiện 20-30 lần tra cứu. Vì vậy, vấn đề chúng tôi gặp phải là làm thế nào để chúng tôi giữ mọi thứ nhanh chóng cho máy khách cũng như nhẹ nhất có thể cho máy chủ. Chúng tôi cũng phân phối phần mềm của mình dưới dạng máy ảo cho một số khách hàng nên việc sử dụng bộ nhớ phải nhẹ vì chúng tôi không thể kiểm soát lượng bộ nhớ mà khách hàng sẽ phân bổ cho máy ảo mà họ triển khai

Tóm lại, đây là danh sách kiểm tra các yêu cầu của chúng tôi

  1. Dung lượng bộ nhớ thấp
  2. Có thể được chia sẻ giữa nhiều quy trình mà không gặp sự cố [chỉ đọc]
  3. Truy cập rất nhanh
  4. Dễ dàng cập nhật [ghi] ngoài quy trình

Vì vậy, nỗ lực đầu tiên của chúng tôi là lưu trữ các mô hình trên đĩa trong MongoDB và tải chúng vào bộ nhớ dưới dạng từ điển Python. Điều này đã thành công và hài lòng #3 và #4 nhưng thất bại #1 và #2. Đây là cách Repustate hoạt động trong một thời gian, nhưng việc sử dụng bộ nhớ vẫn tiếp tục tăng và nó trở nên không bền vững. Từ điển Python không hiệu quả về bộ nhớ. Và quá đắt đối với mỗi quy trình Apache để cần một bản sao của quy trình này vì chúng tôi không chia sẻ dữ liệu giữa các quy trình

Một đêm nọ, tôi đang phàn nàn về tình thế khó xử của chúng tôi và một người bạn của tôi, người tình cờ là một nhà phát triển tuyệt vời tại Red Hat, đã nói ba từ này. “tệp ánh xạ bộ nhớ”. Tất nhiên. Trên thực tế, Repustate đã sử dụng các tệp ánh xạ bộ nhớ để phân tích Tình cảm nhưng tôi hoàn toàn quên mất điều này. Vì vậy, giải quyết được một nửa vấn đề của tôi - nó đáp ứng yêu cầu #2. Nhưng tập tin ánh xạ bộ nhớ có định dạng nào? . cố gắng

Các lần thử [vì lý do nào đó được phát âm là “cây” chứ không phải “thử”] Cây cơ số AKA Cây tiền tố AKA là một cấu trúc dữ liệu tự cho các đối tượng cần khóa chuỗi mượn. Wikipedia có cách viết hay hơn nhưng câu chuyện dài ngắn, các lần thử rất phù hợp với loại mô hình Repustate sử dụng

Tôi đã tìm thấy gói này, marisa try, là một trình bao bọc Python xung quanh việc triển khai C++ của một marisa trie. “Marisa” là từ viết tắt của Matching Algorithm with Recursively Deployed StorAge. Điều tuyệt vời khi marisa thử là cơ chế lưu trữ thực sự thu nhỏ dung lượng bộ nhớ bạn cần. Tác giả của plugin Python đã tuyên bố giảm kích thước 50-100 lần - trải nghiệm của chúng tôi cũng tương tự

Điều tuyệt vời về gói trie marisa là cấu trúc trie bên dưới có thể được ghi vào đĩa và sau đó được đọc qua một đối tượng được ánh xạ bộ nhớ. Với bộ nhớ marisa trie được ánh xạ, tất cả các yêu cầu của chúng tôi hiện đã được đáp ứng. Mức sử dụng bộ nhớ của máy chủ của chúng tôi đã giảm đáng kể, khoảng 40% và hiệu suất của chúng tôi không thay đổi so với khi chúng tôi sử dụng triển khai từ điển của Python

Mỗi chương trình Python được thực thi trong một Quy trình, đây là phiên bản mới của trình thông dịch Python. Quá trình này có tên MainProcess và có một luồng được sử dụng để thực hiện các hướng dẫn của chương trình được gọi là MainThread. Cả quy trình và luồng đều được tạo và quản lý bởi hệ điều hành bên dưới

Đôi khi chúng ta có thể cần tạo các tiến trình con mới trong chương trình của mình để thực thi mã đồng thời

Python cung cấp khả năng tạo và quản lý các quy trình mới thông qua cơ chế đa xử lý. lớp quy trình

Trong lập trình đa xử lý, chúng ta thường cần chia sẻ dữ liệu giữa các tiến trình

Một cách tiếp cận để chia sẻ dữ liệu là sử dụng đường ống

Đường ống là gì và làm thế nào chúng ta có thể sử dụng nó trong Python?

Chạy các vòng lặp của bạn bằng cách sử dụng tất cả các CPU, tải xuống cuốn sách MIỄN PHÍ của tôi để tìm hiểu cách thực hiện

Ống là gì

Trong đa xử lý, một đường ống là một kết nối giữa hai quy trình trong Python

Nó được sử dụng để gửi dữ liệu từ một quy trình được nhận bởi một quy trình khác

Dưới vỏ bọc, một đường ống được triển khai bằng cách sử dụng một cặp đối tượng kết nối, được cung cấp bởi bộ xử lý đa. sự liên quan. lớp kết nối

Tạo một đường ống sẽ tạo ra hai đối tượng kết nối, một đối tượng gửi dữ liệu và một đối tượng nhận dữ liệu. Một đường ống cũng có thể được cấu hình thành song công để mỗi đối tượng kết nối có thể gửi và nhận dữ liệu

ống so với hàng đợi

Cả đa xử lý. Ống và đa xử lý. Hàng đợi có thể được sử dụng để gửi và nhận các đối tượng và dữ liệu giữa các tiến trình

Một ống đơn giản hơn một hàng đợi. Đây là một cơ chế cấp thấp hơn, trước tiên yêu cầu tạo rõ ràng các kết nối giữa một cặp quy trình, sau đó gửi và nhận dữ liệu rõ ràng giữa các quy trình

Hàng đợi là một hàm tạo cấp cao có thể được coi như một cấu trúc dữ liệu cục bộ được chia sẻ giữa các quy trình

Điều quan trọng là Hàng đợi được thiết kế để sử dụng cho nhiều nhà sản xuất và nhiều người tiêu dùng, trong khi Đường ống chỉ dành cho một cặp quy trình

Bản chất nhắm mục tiêu và đơn giản hơn của đường ống có thể làm cho chúng hiệu quả hơn và có khả năng chia sẻ dữ liệu nhanh hơn giữa hai quy trình

Tóm tắt

  • Cả đường ống và hàng đợi đều có thể được sử dụng để chia sẻ dữ liệu giữa các quy trình
  • Đường ống đơn giản và cấp thấp, hàng đợi có nhiều khả năng hơn và cấp cao hơn
  • Đường ống nằm giữa hai quy trình và hàng đợi có nhiều nhà sản xuất và người tiêu dùng

Bối rối với API mô-đun đa xử lý?
Tải xuống bảng cheat PDF MIỄN PHÍ của tôi

Cách sử dụng ống

Python cung cấp một hàng đợi đơn giản trong đa xử lý. lớp ống

Chúng ta hãy xem xét kỹ hơn cách sử dụng lớp đường ống

Tạo một đường ống

Một đường ống có thể được tạo bằng cách gọi hàm tạo của đa xử lý. Lớp ống, trả về hai đa xử lý. sự liên quan. đối tượng kết nối

Ví dụ

1

2

3

.. .

# tạo đường ống

conn1, conn2 = đa xử lý.Ống[]

Theo mặc định, kết nối đầu tiên [conn1] chỉ có thể được sử dụng để nhận dữ liệu, trong khi kết nối thứ hai [conn2] chỉ có thể được sử dụng để gửi dữ liệu

Các đối tượng kết nối có thể được thực hiện song công hoặc hai chiều

Điều này có thể đạt được bằng cách đặt đối số "song công" cho hàm tạo thành True

Ví dụ

1

2

3

.. .

# tạo một đường ống song công

conn1, conn2 = đa xử lý.Đường ống[song công=True]

Trong trường hợp này, cả hai kết nối có thể được sử dụng để gửi và nhận dữ liệu

Chia sẻ đối tượng với đường ống

Các đối tượng có thể được chia sẻ giữa các quy trình bằng cách sử dụng Đường ống

Sự kết nối. Hàm send[] có thể được sử dụng để gửi các đối tượng từ tiến trình này sang tiến trình khác

Các đối tượng được gửi phải được picklable

Ví dụ

1

2

3

.. .

# gửi một đối tượng

conn2. gửi['Xin chào thế giới']

Sự kết nối. Hàm recv[] có thể được sử dụng để nhận các đối tượng trong một quá trình được gửi bởi một quá trình khác

Các đối tượng nhận được sẽ tự động được bỏ chọn

Ví dụ

1

2

3

.. .

# nhận một đối tượng

đối tượng = conn1. recv[]

Cuộc gọi chức năng sẽ chặn cho đến khi nhận được một đối tượng

Chia sẻ byte với đường ống

Dữ liệu có thể được chia sẻ giữa các quy trình bằng cách sử dụng Đường ống

Điều này có thể đạt được bằng cách gửi và nhận dữ liệu dưới dạng gói byte

Byte có thể được gửi từ tiến trình này sang tiến trình khác thông qua Kết nối. hàm send_bytes[]

Ví dụ

1

2

3

.. .

# gửi byte

conn2. gửi[b'Xin chào thế giới']

Nếu dữ liệu byte được giữ trong cấu trúc dữ liệu bộ đệm, thì có thể chỉ định đối số "độ lệch" và "kích thước" khi gửi byte

Ví dụ

1

2

3

.. .

# gửi byte

conn2. gửi[bộ đệm, =10, size=100]

Có thể nhận byte qua Kết nối. hàm recv_bytes[]

Ví dụ

1

2

3

.. .

# nhận byte

dữ liệu = kết nối1. recv_byte[]

Cuộc gọi chức năng sẽ chặn cho đến khi có byte để nhận

Một tin nhắn byte sẽ được đọc

Độ dài tối đa của byte có thể được chỉ định thông qua đối số "maxlength"

Ví dụ

1

2

3

.. .

# nhận byte

dữ liệu = kết nối1. recv_byte[độ dài tối đa=100]

Dữ liệu byte cũng có thể được nhận vào bộ đệm byte hiện có, với độ lệch

Điều này có thể đạt được thông qua Kết nối. recv_bytes_into[] với đối số "offset" tùy chọn

Ví dụ

1

2

3

.. .

# nhận byte

dữ liệu = kết nối1. recv_bytes_into[bộ đệm, =100]

Tình trạng đường ống

Tình trạng của đường ống có thể được kiểm tra thông qua Kết nối. hàm thăm dò []

Điều này sẽ trả về một boolean về việc liệu ba có phải là dữ liệu được nhận và đọc từ đường ống hay không

Ví dụ

1

2

3

4

.. .

# kiểm tra xem có dữ liệu để nhận không

if conn1. thăm dò ý kiến[].

#

Thời gian chờ có thể được đặt thông qua đối số "thời gian chờ". Nếu được chỉ định, cuộc gọi sẽ bị chặn cho đến khi có dữ liệu. Nếu không có dữ liệu nào trước khi hết số giây hết thời gian chờ, thì hàm sẽ trả về

Ví dụ

1

2

3

4

.. .

# kiểm tra xem có dữ liệu để nhận không

if conn1. thăm dò ý kiến[thời gian chờ=5 . ]:

#

Bây giờ chúng ta đã biết cách sử dụng đa xử lý. Ống, hãy xem xét một số ví dụ hoạt động

Khóa học đa xử lý Python miễn phí

Tải xuống bảng cheat API đa xử lý của tôi và như một phần thưởng, bạn sẽ nhận được quyền truy cập MIỄN PHÍ vào khóa học email 7 ngày của tôi

Khám phá cách sử dụng mô-đun đa xử lý Python, bao gồm cách tạo và bắt đầu các tiến trình con cũng như cách sử dụng khóa mutex và semaphores

Tìm hiểu thêm
 

Ví dụ về Sử dụng Đường ống

Chúng ta có thể khám phá cách sử dụng bộ đa xử lý. Đường ống để chia sẻ dữ liệu giữa các quy trình

Trong ví dụ này, chúng tôi sẽ tạo một quy trình người gửi sẽ tạo các số ngẫu nhiên và gửi chúng đến một quy trình khác thông qua đường ống. Chúng tôi cũng sẽ tạo một quy trình nhận sẽ nhận các số được gửi từ quy trình khác và báo cáo chúng

Đầu tiên chúng ta có thể định nghĩa tiến trình người gửi

Chúng ta có thể định nghĩa một hàm mới có tên là sender[] lấy một kết nối làm đối số để gửi các đối tượng. Sau đó, nó lặp lại mười lần và mỗi lần lặp lại, nó sẽ tạo ra một số ngẫu nhiên trong khoảng từ 0 đến 1 thông qua ngẫu nhiên. random[], chặn trong một phần giây để mô phỏng công việc, sau đó gửi giá trị cho quy trình khác qua đường ống

Sau khi hoàn tất, người gửi sẽ gửi một giá trị đặc biệt, được gọi là giá trị trọng điểm để cho biết rằng sẽ không có thêm giá trị nào được gửi. Trong trường hợp này, chúng tôi sẽ sử dụng giá trị “None“

Hàm sender[] bên dưới thực hiện điều này

1

2

3

4

5

6

7

8

9

10

11

12

13

14

# tạo công việc

def người gửi[kết nối]:

    in['Người gửi. Đang chạy', xả=True]

    # tạo công việc

    cho i trong phạm vi[10]:

        # tạo giá trị

        giá trị = ngẫu nhiên[]

        # khối

        ngủ[giá trị]

        # gửi dữ liệu

        kết nối. gửi[giá trị]

    # hoàn tất

    kết nối. gửi[Không có]

    in['Người gửi. Xong', xóa=True]

Tiếp theo, chúng ta có thể định nghĩa quá trình nhận

Chúng ta có thể định nghĩa một hàm receiver[] mới có kết nối để nhận các đối tượng

Chức năng sẽ lặp mãi mãi. Mỗi lần lặp, nó sẽ nhận một đối tượng trên đường ống và chặn cho đến khi nhận được một đối tượng. Sau đó nó sẽ báo giá trị. Nếu giá trị nhận được là giá trị trọng điểm đặc biệt, nó sẽ phá vỡ vòng lặp và quá trình sẽ kết thúc

Hàm get[] bên dưới thực hiện điều này

1

2

3

4

5

6

7

8

9

10

11

12

13

14

# tiêu thụ công việc

def đầu thu[kết nối]:

    in['Người nhận. Đang chạy', xả=True]

    # tiêu thụ công việc

    trong khi Đúng.

        # nhận một đơn vị công việc

        mục = kết nối. recv[]

        # báo cáo

        in[f'>receiver got {item, flush=True]

        # kiểm tra điểm dừng

        nếu mục Không có:

            nghỉ

    # hoàn tất

    in['Người nhận. Xong', xóa=True]

Cuối cùng, quy trình chính sẽ tạo quy trình và đợi chúng kết thúc

Đầu tiên, một đường ống mới được tạo sẽ được sử dụng để gửi và nhận các đối tượng giữa các quy trình

1

2

3

.. .

# tạo đường ống

conn1, conn2 = Ống[]

Tiếp theo, chúng ta có thể tạo một tiến trình con sẽ thực thi hàm sender[] và lấy conn2 chỉ có thể gửi dữ liệu dọc theo đường ống. Sau khi được tạo và định cấu hình, quy trình con được bắt đầu

1

2

3

4

.. .

# bắt đầu người gửi

sender_process = Quy trình[mục tiêu=sender, args=[conn2,]]

sender_ process. bắt đầu[]

Sau đó, chúng ta có thể tạo một tiến trình con khác sẽ thực thi hàm receiver[] và lấy conn1 chỉ có thể nhận dữ liệu qua đường ống. Quá trình này sau đó cũng có thể được bắt đầu

1

2

3

4

.. .

# khởi động máy thu

receiver_process = Quy trình[mục tiêu=receiver, args=[conn1,]]

receiver_ process. bắt đầu[]

Quá trình chính sau đó có thể chặn cho đến khi cả hai quá trình con kết thúc

1

2

3

4

.. .

# chờ cho tất cả các quá trình kết thúc

sender_process. tham gia[]

receiver_ process. tham gia[]

Liên kết điều này lại với nhau, ví dụ hoàn chỉnh được liệt kê bên dưới

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

#Trăn Siêu Nhanh. com

# ví dụ về việc sử dụng đường ống giữa các quy trình

từ thời gian nhập ngủ

từ ngẫu nhiên nhập ngẫu nhiên

từ đa xử lý nhập Quy trình

từ đa xử lý nhập ống

 

# tạo công việc

def người gửi[kết nối]:

    in['Người gửi. Đang chạy', xả=True]

    # tạo công việc

    cho i trong phạm vi[10]:

        # tạo giá trị

        giá trị = ngẫu nhiên[]

        # khối

        ngủ[giá trị]

        # gửi dữ liệu

        kết nối. gửi[giá trị]

    # hoàn tất

    kết nối. gửi[Không có]

    in['Người gửi. Xong', xóa=True]

 

# tiêu thụ công việc

def đầu thu[kết nối]:

    in['Người nhận. Đang chạy', xả=True]

    # tiêu thụ công việc

    trong khi Đúng.

        # nhận một đơn vị công việc

        mục = kết nối. recv[]

        # báo cáo

        in[f'>receiver got {item, flush=True]

        # kiểm tra điểm dừng

        nếu mục Không có:

            nghỉ

    # hoàn tất

    in['Người nhận. Xong', xóa=True]

 

# điểm vào

if __name__ == '__main__'.

    # tạo đường dẫn

    conn1, conn2 = Pipe[]

    # bắt đầu người gửi

    sender_process = Process[target=sender, args=[conn2,]]

    sender_process. bắt đầu[]

    # khởi động bộ thu

    receiver_ process = Process[target=receiver, args=[conn1,]]

    receiver_ process. bắt đầu[]

   # đợi tất cả các quy trình hoàn tất

    sender_process. tham gia[]

    receiver_process. tham gia[]

Chạy ví dụ trước tiên sẽ tạo đường ống, sau đó tạo và bắt đầu cả hai quy trình con

Quá trình chính sau đó chặn cho đến khi quá trình con kết thúc

Sau đó, tiến trình con của người gửi sẽ chạy trong một vòng lặp, tạo và gửi mười giá trị ngẫu nhiên dọc theo đường ống. Khi tất cả các giá trị được tạo và gửi, quá trình gửi kết thúc

Quá trình con lặp lại, nhận các đối tượng từ đường ống mỗi lần lặp. Nó chặn cho đến khi một đối tượng xuất hiện mỗi lần lặp lại. Các giá trị đã nhận được báo cáo và vòng lặp bị hỏng sau khi nhận được giá trị trọng điểm

Lưu ý, kết quả cụ thể của bạn sẽ khác do sử dụng các số ngẫu nhiên

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

Người gửi. Đang chạy

Người nhận. Đang chạy

> người nhận có 0. 7672216614763814

> người nhận có 0. 6766375162201561

> người nhận có 0. 3958018282651511

> người nhận có 0. 0019046615500002417

> người nhận có 0. 4850692187219524

> người nhận có 0. 6249563547834307

> người nhận có 0. 44768176602669507

> người nhận có 0. 852944978432572

> người nhận có 0. 26856894937968356

Người gửi. Xong

> người nhận có 0. 7724406632512203

> người nhận không có

Người nhận. Xong

Điều này nêu bật cách sử dụng đường ống mặc định để gửi dữ liệu từ quy trình này sang quy trình khác

Tiếp theo, hãy xem xét một đường ống song công hoặc hai chiều giữa hai quy trình

Choáng ngợp trước các API đồng thời của python?
Để tìm sự giải thoát, hãy tải xuống Bản đồ tư duy về đồng thời Python MIỄN PHÍ của tôi

Ví dụ về Sử dụng Ống Duplex

đa xử lý. Đường ống có thể được sử dụng để gửi và nhận dữ liệu giữa hai quy trình

Đây được gọi là đường ống song công hoặc hai chiều và có thể đạt được bằng cách đặt đối số "song công" thành True khi tạo đường ống

Trong ví dụ này, chúng tôi sẽ chơi bóng bàn giữa hai tiến trình player1 và player2. Player1 sẽ bắt đầu trò chơi bằng cách tạo một giá trị ngẫu nhiên trong khoảng từ 0 đến 1 và gửi nó cho player2. Player2 sẽ nhận giá trị, thêm một giá trị ngẫu nhiên mới vào giá trị nhận được và gửi lại cho player1. Player1 sẽ nhận giá trị và thực hiện hành động tương tự là thêm một giá trị ngẫu nhiên vào giá trị nhận được và gửi lại

Quá trình này được lặp lại cho đến khi nhận được giá trị trên 10, sau đó cả hai quá trình của trình phát sẽ kết thúc

Đầu tiên, chúng ta có thể định nghĩa một hàm lấy một kết nối và một giá trị làm đối số, thêm một giá trị ngẫu nhiên vào giá trị đó và gửi nó dọc theo kết nối. Giá trị được truyền dưới dạng đối số sẽ là giá trị nhận được dọc theo đường ống

Chức năng này có thể được sử dụng bởi cả hai người chơi trong trò chơi bóng bàn và có thể được sử dụng bởi người chơi 1 khi bắt đầu trò chơi

Hàm generate_send[] được liệt kê bên dưới thực hiện điều này

1

2

3

4

5

6

7

8

9

10

11

12

# tạo và gửi một giá trị

def generate_send[kết nối, value]:

    # giá trị tạo ra

    new_value = ngẫu nhiên[]

   # khối

    ngủ[new_value]

    # giá trị cập nhật

    giá trị = giá trị + new_value

    # báo cáo

    in[f'>đang gửi {value}, flush=True]

    # giá trị gửi

    kết nối. gửi[giá trị]

Tiếp theo, chúng ta có thể định nghĩa một chức năng đóng gói trò chơi

Hàm nhận một đối tượng kết nối có thể gửi và nhận, và một giá trị boolean để biết liệu nó có nên bắt đầu trò chơi hay không

1

2

3

# bóng bàn giữa các quy trình

def pingpong[kết nối, send_first]:

#

Sau đó, hàm sẽ kiểm tra xem nó có nên bắt đầu trò chơi hay không và nếu có, nó sẽ gọi hàm generate_send[] được xác định ở trên với giá trị bằng 0

1

2

3

4

.. .

# kiểm tra xem quy trình này có nên khởi tạo quy trình không

if send_first.

    generate_send[kết nối, 0]

Chức năng sẽ lặp mãi mãi. Mỗi lần lặp nó sẽ nhận một đối tượng dọc theo đường ống và chặn cho đến khi nhận được một đối tượng. Sau đó, nó sẽ báo cáo giá trị, gửi lại giá trị bằng lệnh gọi đến generate_send[], sau đó dừng trò chơi nếu giá trị vượt quá ngưỡng, trong trường hợp này là 10

1

2

3

4

5

6

7

8

9

10

11

12

.. .

# chạy cho đến khi đạt đến giới hạn

trong khi Đúng.

    # lần đọc giá trị

    giá trị = kết nối. recv[]

    # báo cáo

    print[f'>receed {value}, flush=True]

    # gửi lại giá trị

    generate_send[kết nối, value]

    # kiểm tra điểm dừng

    giá trị nếu value > 10:

        nghỉ

Liên kết cái này lại với nhau, hàm pingpong[] hoàn chỉnh được liệt kê bên dưới

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

# bóng bàn giữa các quy trình

def pingpong[kết nối, send_first]:

    in['Quy trình đang chạy',đang gửi {value}, flush=True]

    # giá trị gửi

    kết nối. gửi[giá trị]

 

# bóng bàn giữa các quy trình

def pingpong[kết nối, send_first]:

    in['Quy trình đang chạy',

Chủ Đề