Hướng dẫn python multiprocessing memory leak - Rò rỉ bộ nhớ đa xử lý python

Tôi tìm thấy một vài bài viết nên chứng minh khá hữu ích. Vẫn chưa có thời gian để tiêu hóa tất cả thông tin trong đó, nhưng nghĩ rằng tôi sẽ đăng các liên kết và cho phép bạn xem xét chúng.

Show

Marius Gedminas có hai bài viết về việc săn lùng memleaks trong một bộ thử nghiệm Python. Anh ta đang sử dụng các mô-đun gcinspect tích hợp và chỉ cần đổ đồ thị đối tượng vào đĩa dưới dạng các tệp CSV, vì vậy cách tiếp cận sẽ hoạt động khá tốt ngay cả đối với các ứng dụng mp.

Tôi sẽ xem xét bản thân của mình sau ngày hôm nay khi tôi có thời gian.

CẬP NHẬT

Marius đã phát hành giàn thử nghiệm của mình như một dự án nguồn mở có tên objgraph (liên kết). Nó theo dõi các tài liệu tham khảo đối tượng gc nhưng cho phép bạn in ra thông tin hữu ích như có bao nhiêu trường hợp được thêm vào loại sau khi gọi chức năng và nó cho phép bạn xem chuỗi tham chiếu đầy đủ cho các đối tượng.

Các tài liệu khá tự giải thích và tôi không thể thấy một lý do tại sao nó cũng không hoạt động với các ứng dụng mp.

Tuy nhiên, nếu rò rỉ bộ nhớ của bạn đến từ một số thư viện C bên dưới thì điều này có thể không giúp bạn. Ít nhất nó sẽ cho bạn một ý tưởng về nơi rò rỉ. Nếu hóa ra không nằm trong mã Python của bạn thì bạn có thể phải tái cấu trúc mã của mình để bạn có thể chạy các thư viện C có liên quan trong quy trình chính và sử dụng một cái gì đó như Valgrind để phát hiện rò rỉ.


Bài đăng gốc http://mg.pov.lt/blog/hunting-python-memleaks.html

Một trong những nơi anh ấy đi vào các công cụ mà anh ấy đang sử dụng http://mg.pov.lt/blog/python-object-graphs.html

Bài đăng khiến tôi bắt đầu http://www.lshift.net/blog/2008/11/14/tracing-python-memory-reaks

Có một vấn đề rò rỉ bộ nhớ lịch sử trong ứng dụng Django của chúng tôi và tôi đã sửa nó gần đây. Thời gian trôi qua, việc sử dụng bộ nhớ của ứng dụng tiếp tục phát triển và việc sử dụng CPU cũng vậy.

Hướng dẫn python multiprocessing memory leak - Rò rỉ bộ nhớ đa xử lý python

Sau một số nghiên cứu, tôi tìm ra nguyên nhân. Một số chế độ xem không đóng multiprocessing.Pool sau khi sử dụng nó. Vấn đề biến mất khi tôi sử dụng

import gc
import time
import weakref


from multiprocessing import Pool

def func(i):
    return i


p = Pool(4)
wr = weakref.ref(p)
p.map(func, range(4))
print(wr())
print(gc.get_referents(wr()))
# p.close()
# p.terminate()
time.sleep(1)
del p
gc.collect()
print(wr())
print(gc.get_referents(wr()))
0 với câu lệnh
import gc
import time
import weakref


from multiprocessing import Pool

def func(i):
    return i


p = Pool(4)
wr = weakref.ref(p)
p.map(func, range(4))
print(wr())
print(gc.get_referents(wr()))
# p.close()
# p.terminate()
time.sleep(1)
del p
gc.collect()
print(wr())
print(gc.get_referents(wr()))
1.

Hướng dẫn python multiprocessing memory leak - Rò rỉ bộ nhớ đa xử lý python

Nhưng tôi vẫn quan tâm đến nó và viết một số mã thử nghiệm. Kịch bản được chạy trong Python 3.6.8 và tạo ra kết quả tương tự khi sử dụng

import gc
import time
import weakref


from multiprocessing import Pool

def func(i):
    return i


p = Pool(4)
wr = weakref.ref(p)
p.map(func, range(4))
print(wr())
print(gc.get_referents(wr()))
# p.close()
# p.terminate()
time.sleep(1)
del p
gc.collect()
print(wr())
print(gc.get_referents(wr()))
2.

import time
from multiprocessing import Pool


def func(i):
    return i


def ori():
    # create many thread as time goes by, when i==300 cpu grow to 300%, run out of 16g ram and stuck I have to kill process
    p = Pool(4)
    r.append(p.map(func, range(4)))


def with_close():
    # 100% cpu, 0.1 ram, create 40 thread, takes 41s
    p = Pool(4)
    r.append(p.map(func, range(4)))
    p.close()


def with_terminate():
    # 5% cpu, 0.1 ram, create 4 thread, takes 425s
    p = Pool(4)
    r.append(p.map(func, range(4)))
    p.terminate()


def with_with():
    # same as terminate
    with Pool(4) as p:
        r.append(p.map(func, range(4)))


r = []
s = time.time()
for i in range(4000):
    ori()
    # with_close()
    # with_terminate()
    # with_with()

    if i % 100 == 0:
        print(i)

print(f'takes {time.time() - s} seconds')

Như bạn có thể thấy, có bốn chức năng. Hàm

import gc
import time
import weakref


from multiprocessing import Pool

def func(i):
    return i


p = Pool(4)
wr = weakref.ref(p)
p.map(func, range(4))
print(wr())
print(gc.get_referents(wr()))
# p.close()
# p.terminate()
time.sleep(1)
del p
gc.collect()
print(wr())
print(gc.get_referents(wr()))
3 là
import gc
import time
import weakref


from multiprocessing import Pool

def func(i):
    return i


p = Pool(4)
wr = weakref.ref(p)
p.map(func, range(4))
print(wr())
print(gc.get_referents(wr()))
# p.close()
# p.terminate()
time.sleep(1)
del p
gc.collect()
print(wr())
print(gc.get_referents(wr()))
0 không có
import gc
import time
import weakref


from multiprocessing import Pool

def func(i):
    return i


p = Pool(4)
wr = weakref.ref(p)
p.map(func, range(4))
print(wr())
print(gc.get_referents(wr()))
# p.close()
# p.terminate()
time.sleep(1)
del p
gc.collect()
print(wr())
print(gc.get_referents(wr()))
5 và
import gc
import time
import weakref


from multiprocessing import Pool

def func(i):
    return i


p = Pool(4)
wr = weakref.ref(p)
p.map(func, range(4))
print(wr())
print(gc.get_referents(wr()))
# p.close()
# p.terminate()
time.sleep(1)
del p
gc.collect()
print(wr())
print(gc.get_referents(wr()))
6, RAM tiếp tục phát triển và kịch bản bị mắc kẹt.
import gc
import time
import weakref


from multiprocessing import Pool

def func(i):
    return i


p = Pool(4)
wr = weakref.ref(p)
p.map(func, range(4))
print(wr())
print(gc.get_referents(wr()))
# p.close()
# p.terminate()
time.sleep(1)
del p
gc.collect()
print(wr())
print(gc.get_referents(wr()))
7,
import gc
import time
import weakref


from multiprocessing import Pool

def func(i):
    return i


p = Pool(4)
wr = weakref.ref(p)
p.map(func, range(4))
print(wr())
print(gc.get_referents(wr()))
# p.close()
# p.terminate()
time.sleep(1)
del p
gc.collect()
print(wr())
print(gc.get_referents(wr()))
8 và
import gc
import time
import weakref


from multiprocessing import Pool

def func(i):
    return i


p = Pool(4)
wr = weakref.ref(p)
p.map(func, range(4))
print(wr())
print(gc.get_referents(wr()))
# p.close()
# p.terminate()
time.sleep(1)
del p
gc.collect()
print(wr())
print(gc.get_referents(wr()))
9 sẽ thoát ra bình thường nhưng thời gian là khác nhau.

Tại sao [{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ] [{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ] 0 nhanh hơn [{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ] [{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ] 1


[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]

[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]
2 sẽ gọi

[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]

[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]
1 trong mỗi công nhân.

[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]

[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]
4 Chỉ cần thay đổi trạng thái hồ bơi và mỗi công nhân sẽ tự chấm dứt. Bạn có thể tìm thấy mã nguồn trên GitHub.

Xác minh rò rỉ bộ nhớ

import gc
import time
import weakref


from multiprocessing import Pool

def func(i):
    return i


p = Pool(4)
wr = weakref.ref(p)
p.map(func, range(4))
print(wr())
print(gc.get_referents(wr()))
# p.close()
# p.terminate()
time.sleep(1)
del p
gc.collect()
print(wr())
print(gc.get_referents(wr()))

Nếu không gọi

import gc
import time
import weakref


from multiprocessing import Pool

def func(i):
    return i


p = Pool(4)
wr = weakref.ref(p)
p.map(func, range(4))
print(wr())
print(gc.get_referents(wr()))
# p.close()
# p.terminate()
time.sleep(1)
del p
gc.collect()
print(wr())
print(gc.get_referents(wr()))
5 hoặc
import gc
import time
import weakref


from multiprocessing import Pool

def func(i):
    return i


p = Pool(4)
wr = weakref.ref(p)
p.map(func, range(4))
print(wr())
print(gc.get_referents(wr()))
# p.close()
# p.terminate()
time.sleep(1)
del p
gc.collect()
print(wr())
print(gc.get_referents(wr()))
6, sau khi thực hiện,

[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]

[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]
7 vẫn được giới thiệu bởi một số đối tượng:


[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]

[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]

Sau khi gọi


[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]

[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]
0 hoặc

[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]

[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]
1, hai dòng cuối cùng trở thành:

Cập nhật tài liệu

Tài liệu Python3.7 thêm cảnh báo này:

Các đối tượng gc0 có các tài nguyên nội bộ cần được quản lý đúng (như bất kỳ tài nguyên nào khác) bằng cách sử dụng nhóm làm trình quản lý bối cảnh hoặc bằng cách gọi


[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]

[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]
0 và

[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]

[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]
1 theo cách thủ công. Việc không làm điều này có thể dẫn đến quá trình treo khi hoàn thiện. Lưu ý rằng không chính xác khi dựa vào bộ đệm rác để phá hủy hồ bơi vì CPython không đảm bảo rằng bộ hoàn thiện của hồ bơi sẽ được gọi (xem gc3 để biết thêm thông tin).

Lỗi được cố định trong Python 3.8

Trong Python 3.8.6, tập lệnh thoát ra bình thường và tổng thời gian thực hiện cũng giảm mà không gọi


[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]

[{'_ctx': , '_inqueue': , '_outqueue': , '_quick_put': >, '_quick_get': >, '_taskqueue': , '_cache': {}, '_state': 0, '_maxtasksperchild': None, '_initializer': None, '_initargs': (), '_processes': 4, '_pool': [, , , ], '_worker_handler': , '_task_handler': , '_result_handler': , '_terminate': , , , [, , , ], , , , {}), exitprority=15>}, ]
0. Tôi thấy vấn đề này đã được khắc phục trong Trình theo dõi lỗi Python: Multiprocessing.Pool và ThreadPool rò rỉ tài nguyên sau khi bị xóa.