Hướng dẫn python multiprocessing combine results - kết hợp đa xử lý python kết quả

Câu hỏi này có liên quan đến một câu hỏi trước đây tôi đã hỏi, và nó có vẻ như là một câu hỏi đơn giản, nhưng tôi gặp khó khăn trong việc tìm kiếm thông tin hoặc hướng dẫn hữu ích về chủ đề đa xử lý.

Vấn đề của tôi là tôi muốn kết hợp dữ liệu được sản xuất thành một mảng lớn và sau đó lưu trữ nó trong tệp HDF của tôi.

def Simulation(i, output):
    # make a simulation which outputs it resutlts in A. with shape 4000,3
    A = np.array([4000,3])

    output.put(A)

def handle_output(output):
    hdf = pt.openFile('simulation.h5',mode='w')
    hdf.createGroup('/','data')

    # Here the output should be joined somehow. 
    # I would like to get it in the shape [4000,3,10]

    output.get()
    hdf.createArray('/data','array',A)
    hdf.close()

if __name__ == '__main__':
    output = mp.Queue()    
    jobs = []
    proc = mp.Process(target=handle_output, args=(output, ))
    proc.start()
    for i in range(10):
        p = mp.Process(target=Simulation, args=(i, output))
        jobs.append(p)       
        p.start()
    for p in jobs:
        p.join()
    output.put(None)
    proc.join()

Ở đây, chúng ta chỉ cần chỉ định kiểu dữ liệu. Giá trị có thể được đưa ra một giá trị ban đầu (nói 10) như thế này:

def Simulation(i):
   return output

p = mp.Pool(16)

result = p.map(Simulation, range(10))
result = np.array(result).reshape(...)
p.close()
p.join()


Quá trình

Square_sum được cung cấp một giá trị bằng cách sử dụng thuộc tính giá trị của nó:

1._

#!/usr/bin/python

from multiprocessing
import Process

def fun(name):
   print(f 'hello {name}')

def main():

   p = Process(target = fun, args = ('Peter', ))
p.start()

if __name__ == '__main__':
   main()

Chỉ làm một cái gì đó như:

def fun(name):
   print(f 'hello {name}')

Gợi ý: 2

def main():

   p = Process(target = fun, args = ('Peter', ))
p.start()


sửa đổi lần cuối ngày 29 tháng 7 năm 2022

Chúng tôi tạo ra một quy trình mới và chuyển một giá trị cho nó.

from multiprocessing
import Pool

def f(x):
   return x * x

if __name__ == '__main__':
   with Pool(5) as p:
   print(p.map(f, [1, 2, 3]))

from multiprocessing
import Process

def f(name):
   print('hello', name)

if __name__ == '__main__':
   p = Process(target = f, args = ('bob', ))
p.start()
p.join()

from multiprocessing
import Process
import os

def info(title):
   print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())

def f(name):
   info('function f')
print('hello', name)

if __name__ == '__main__':
   info('main line')
p = Process(target = f, args = ('bob', ))
p.start()
p.join()

import multiprocessing as mp

def foo(q):
   q.put('hello')

if __name__ == '__main__':
   mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target = foo, args = (q, ))
p.start()
print(q.get())
p.join()

import multiprocessing as mp

def foo(q):
   q.put('hello')

if __name__ == '__main__':
   ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target = foo, args = (q, ))
p.start()
print(q.get())
p.join()


Gợi ý: 4

Trong hướng dẫn đa xử lý trước đó, chúng tôi đã chỉ ra cách bạn có thể sinh ra các quy trình. Nếu các quy trình này là tốt để tự hành động, mà không liên lạc với nhau hoặc trở lại chương trình chính, thì điều này là tốt. Các quy trình này cũng có thể chia sẻ một cơ sở dữ liệu chung, hoặc một cái gì đó tương tự để làm việc cùng nhau, nhưng, nhiều lần, sẽ có ý nghĩa hơn khi sử dụng đa xử lý để thực hiện một số xử lý, sau đó trả lại kết quả trở lại chương trình chính. Đó là những gì chúng tôi sẽ đề cập ở đây., Chào mừng bạn đến với phần 11 của loạt hướng dẫn lập trình Python trung gian. Trong phần này, chúng ta sẽ nói nhiều hơn về thư viện tích hợp: Đa xử lý. Khi hoàn thành, chúng tôi đóng hồ bơi, và sau đó chúng tôi sẽ in kết quả. Trong trường hợp này, chúng tôi nhận được:, nếu bạn nâng phạm vi và quy trình lên 100, bạn có thể thấy CPU của bạn tối đa và các quy trình nếu bạn muốn. ~ 500 đối với tôi dường như làm thủ thuật.

Để bắt đầu, chúng tôi sẽ nhập

#!/usr/bin/python

from multiprocessing
import Process

def fun(name):
   print(f 'hello {name}')

def main():

   p = Process(target = fun, args = ('Peter', ))
p.start()

if __name__ == '__main__':
   main()
8

def Simulation(i):
   return output

p = mp.Pool(16)

result = p.map(Simulation, range(10))
result = np.array(result).reshape(...)
p.close()
p.join()
0

Giả sử chúng tôi muốn chạy một chức năng trên mỗi mục trong một mục có thể. Hãy chỉ làm:

def Simulation(i):
   return output

p = mp.Pool(16)

result = p.map(Simulation, range(10))
result = np.array(result).reshape(...)
p.close()
p.join()
1

Đủ đơn giản, bây giờ chúng ta hãy thiết lập các quy trình:

def Simulation(i):
   return output

p = mp.Pool(16)

result = p.map(Simulation, range(10))
result = np.array(result).reshape(...)
p.close()
p.join()
2


Gợi ý: 5

Bằng cách thêm một luồng mới cho mỗi tài nguyên tải xuống, mã có thể tải xuống nhiều nguồn dữ liệu song song và kết hợp các kết quả ở cuối mỗi lần tải xuống. Điều này có nghĩa là mỗi lần tải xuống tiếp theo không chờ đợi khi tải xuống các trang web trước đó. Trong trường hợp này, chương trình hiện bị ràng buộc bởi các giới hạn băng thông của (các) máy khách/máy chủ. Mỗi luồng tạo một danh sách mới và thêm các số ngẫu nhiên vào nó. Điều này đã được chọn làm ví dụ đồ chơi vì nó nặng CPU., Do đó, một phương tiện để tăng tốc mã đó nếu nhiều nguồn dữ liệu được truy cập là để tạo một luồng cho từng mục dữ liệu cần được truy cập. Một mã Python đang cạo nhiều URL web. Cho rằng mỗi URL sẽ có thời gian tải xuống liên quan vượt quá khả năng xử lý CPU của máy tính, một triển khai đơn luồng sẽ bị ràng buộc đáng kể I/O.

Cuối cùng, các công việc được bắt đầu tuần tự và sau đó tuần tự "tham gia". Phương thức

#!/usr/bin/python

from multiprocessing
import Process

def fun(name):
   print(f 'hello {name}')

def main():

   p = Process(target = fun, args = ('Peter', ))
p.start()

if __name__ == '__main__':
   main()
9 chặn luồng gọi (nghĩa là luồng phiên dịch Python chính) cho đến khi luồng đã chấm dứt. Điều này đảm bảo rằng tất cả các luồng được hoàn thành trước khi in thông báo hoàn thành vào bảng điều khiển:

def Simulation(i):
   return output

p = mp.Pool(16)

result = p.map(Simulation, range(10))
result = np.array(result).reshape(...)
p.close()
p.join()
3

Chúng ta có thể thời gian mã này bằng cách sử dụng cuộc gọi bảng điều khiển sau:

def Simulation(i):
   return output

p = mp.Pool(16)

result = p.map(Simulation, range(10))
result = np.array(result).reshape(...)
p.close()
p.join()
4

Nó tạo ra đầu ra sau:

def Simulation(i):
   return output

p = mp.Pool(16)

result = p.map(Simulation, range(10))
result = np.array(result).reshape(...)
p.close()
p.join()
5


Gợi ý: 6

Cập nhật lần cuối: 18 tháng 10 năm 2021, Cổng CS 2021 Giáo trình

3._

def Simulation(i):
   return output

p = mp.Pool(16)

result = p.map(Simulation, range(10))
result = np.array(result).reshape(...)
p.close()
p.join()
6

6._

def Simulation(i):
   return output

p = mp.Pool(16)

result = p.map(Simulation, range(10))
result = np.array(result).reshape(...)
p.close()
p.join()
7

kết quả

def Simulation(i):
   return output

p = mp.Pool(16)

result = p.map(Simulation, range(10))
result = np.array(result).reshape(...)
p.close()
p.join()
8

3._

def Simulation(i):
   return output

p = mp.Pool(16)

result = p.map(Simulation, range(10))
result = np.array(result).reshape(...)
p.close()
p.join()
7

kết quả

def Simulation(i):
   return output

p = mp.Pool(16)

result = p.map(Simulation, range(10))
result = np.array(result).reshape(...)
p.close()
p.join()
8

Tương tự, chúng tôi tạo ra một giá trị square_sum như thế này:square_sum like this:

#!/usr/bin/python

from multiprocessing
import Process

def fun(name):
   print(f 'hello {name}')

def main():

   p = Process(target = fun, args = ('Peter', ))
p.start()

if __name__ == '__main__':
   main()
1

kết quả

def Simulation(i):
   return output

p = mp.Pool(16)

result = p.map(Simulation, range(10))
result = np.array(result).reshape(...)
p.close()
p.join()
8

Tương tự, chúng tôi tạo ra một giá trị square_sum như thế này:square_sum like this:

#!/usr/bin/python

from multiprocessing
import Process

def fun(name):
   print(f 'hello {name}')

def main():

   p = Process(target = fun, args = ('Peter', ))
p.start()

if __name__ == '__main__':
   main()
1

Ở đây, chúng ta chỉ cần chỉ định kiểu dữ liệu. Giá trị có thể được đưa ra một giá trị ban đầu (nói 10) như thế này:

#!/usr/bin/python

from multiprocessing
import Process

def fun(name):
   print(f 'hello {name}')

def main():

   p = Process(target = fun, args = ('Peter', ))
p.start()

if __name__ == '__main__':
   main()
4

Quá trình

#!/usr/bin/python

from multiprocessing
import Process

def fun(name):
   print(f 'hello {name}')

def main():

   p = Process(target = fun, args = ('Peter', ))
p.start()

if __name__ == '__main__':
   main()
5

kết quả

#!/usr/bin/python

from multiprocessing
import Process

def fun(name):
   print(f 'hello {name}')

def main():

   p = Process(target = fun, args = ('Peter', ))
p.start()

if __name__ == '__main__':
   main()
6

Tương tự, chúng tôi tạo ra một giá trị square_sum như thế này: is given a value by using its value attribute:

#!/usr/bin/python

from multiprocessing
import Process

def fun(name):
   print(f 'hello {name}')

def main():

   p = Process(target = fun, args = ('Peter', ))
p.start()

if __name__ == '__main__':
   main()
7