Hướng dẫn how many threads can be spawned python? - bao nhiêu chủ đề có thể được sinh ra python?

Một số người sẽ nói rằng hai chủ đề là quá nhiều - tôi không hoàn toàn ở trong trại đó :-)

Đây là lời khuyên của tôi: Đo lường, đừng đoán. Một gợi ý là làm cho nó có thể định cấu hình và ban đầu đặt nó thành 100, sau đó phát hành phần mềm của bạn ra tự nhiên và theo dõi những gì xảy ra.

Nếu việc sử dụng chủ đề của bạn đạt đỉnh ở mức 3, thì 100 là quá nhiều. Nếu nó vẫn ở mức 100 trong hầu hết thời gian trong ngày, hãy tăng tới 200 và xem điều gì sẽ xảy ra.

Bạn thực sự có thể có mã của mình theo dõi việc sử dụng và điều chỉnh cấu hình cho lần tiếp theo nó bắt đầu nhưng điều đó có lẽ là quá mức cần thiết.


Để làm rõ và xây dựng:

Tôi không ủng hộ việc lăn hệ thống con gộp chủ đề của riêng bạn, bằng mọi cách sử dụng hệ thống bạn có. Nhưng, vì bạn đang hỏi về một điểm giới hạn tốt cho các chủ đề, tôi cho rằng việc triển khai nhóm chủ đề của bạn có khả năng giới hạn số lượng chủ đề tối đa được tạo (đó là một điều tốt).

Tôi đã viết mã gộp kết nối chủ đề và cơ sở dữ liệu và chúng có các tính năng sau (mà tôi tin là rất cần thiết cho hiệu suất):

  • Một số lượng tối thiểu của các luồng hoạt động.
  • Một số lượng tối đa của các luồng.
  • Tắt các chủ đề chưa được sử dụng trong một thời gian.

Đầu tiên đặt đường cơ sở cho hiệu suất tối thiểu theo máy khách nhóm luồng (số lượng chủ đề này luôn có sẵn để sử dụng). Thứ hai đặt ra một hạn chế về việc sử dụng tài nguyên bằng các luồng hoạt động. Thứ ba đưa bạn trở lại đường cơ sở trong thời gian yên tĩnh để giảm thiểu việc sử dụng tài nguyên.

Bạn cần cân bằng việc sử dụng tài nguyên có các luồng không sử dụng (a) so với việc sử dụng tài nguyên không có đủ luồng để thực hiện công việc (b).

. (B) Nói chung sẽ là một sự chậm trễ trong việc xử lý các yêu cầu khi chúng đến khi bạn cần chờ một chủ đề có sẵn.

Đó là lý do tại sao bạn đo lường. Khi bạn tuyên bố, phần lớn các chủ đề của bạn sẽ chờ phản hồi từ cơ sở dữ liệu để chúng không chạy. Có hai yếu tố ảnh hưởng đến số lượng chủ đề bạn nên cho phép.

Đầu tiên là số lượng kết nối DB có sẵn. Đây có thể là một giới hạn cứng trừ khi bạn có thể tăng nó tại DBMS - Tôi sẽ cho rằng DBM của bạn có thể có số lượng kết nối không giới hạn trong trường hợp này (mặc dù bạn cũng nên đo lường điều đó).

Sau đó, số lượng chủ đề bạn nên phụ thuộc vào việc sử dụng lịch sử của bạn. Tối thiểu bạn nên chạy là số tối thiểu mà bạn đã từng chạy + một%, với mức tối thiểu tuyệt đối (ví dụ, và làm cho nó có thể định cấu hình giống như A) 5.

Số lượng chủ đề tối đa phải là tối đa lịch sử + B%của bạn.

Bạn cũng nên theo dõi các thay đổi hành vi. Nếu, vì một số lý do, việc sử dụng của bạn lên tới 100% có sẵn trong một thời gian đáng kể (do đó nó sẽ ảnh hưởng đến hiệu suất của khách hàng), bạn nên tăng mức tối đa được phép cho đến khi nó một lần nữa B% cao hơn.


Đáp lại "chính xác thì tôi nên đo lường điều gì?" câu hỏi:

Những gì bạn nên đo cụ thể là lượng chủ đề tối đa trong sử dụng đồng thời (ví dụ: chờ đợi khi trả lại từ cuộc gọi DB) theo tải. Sau đó, thêm hệ số an toàn 10% chẳng hạn (nhấn mạnh, vì các áp phích khác dường như lấy các ví dụ của tôi làm khuyến nghị cố định).

Ngoài ra, điều này nên được thực hiện trong môi trường sản xuất để điều chỉnh. Không sao để có được một ước tính trước nhưng bạn không bao giờ biết sản phẩm nào sẽ ném theo cách của bạn (đó là lý do tại sao tất cả những thứ này nên được cấu hình trong thời gian chạy). Điều này là để bắt gặp một tình huống như tăng gấp đôi bất ngờ các cuộc gọi của khách hàng đến.

Lưu ý: Theo yêu cầu phổ biến mà tôi chứng minh một số kỹ thuật thay thế --- bao gồm async/chờ đợi, chỉ có sẵn kể từ khi Python 3.5 --- tôi đã thêm một số cập nhật vào cuối bài viết. Vui thích! By popular request that I demonstrate some alternative techniques---including async/await, only available since the advent of Python 3.5---I've added some updates at the end of the article. Enjoy!

Các cuộc thảo luận chỉ trích Python thường nói về việc khó sử dụng Python cho công việc đa luồng, chỉ tay vào cái được gọi là khóa phiên dịch toàn cầu (được gọi một cách trìu mến là GIL) ngăn chặn nhiều chủ đề của mã Python chạy đồng thời. Do đó, mô -đun đa luồng Python không hoàn toàn hoạt động theo cách bạn mong đợi nếu bạn không phải là nhà phát triển Python và bạn đến từ các ngôn ngữ khác như C ++ hoặc Java. Cần phải làm rõ rằng người ta vẫn có thể viết mã bằng Python chạy đồng thời hoặc song song và tạo ra sự khác biệt rõ rệt về hiệu suất dẫn đến, miễn là một số điều nhất định được xem xét. Nếu bạn chưa đọc nó, tôi khuyên bạn nên xem bài viết của Eqbal Quran, về sự đồng thời và song song trong Ruby ở đây trên blog kỹ thuật Toptal.

Trong hướng dẫn đồng thời Python này, chúng tôi sẽ viết một kịch bản Python nhỏ để tải xuống các hình ảnh phổ biến hàng đầu từ Imgur. Chúng tôi sẽ bắt đầu với một phiên bản tải xuống hình ảnh tuần tự hoặc một lần. Như một điều kiện tiên quyết, bạn sẽ phải đăng ký một ứng dụng trên Imgur. Nếu bạn chưa có tài khoản Imgur, vui lòng tạo một tài khoản trước.

Các tập lệnh trong các ví dụ luồng này đã được thử nghiệm với Python 3.6.4. Với một số thay đổi, họ cũng nên chạy với Python 2, ur Surb là những gì đã thay đổi nhất giữa hai phiên bản Python này.

Bắt đầu với Python MultiThreading

Chúng ta hãy bắt đầu bằng cách tạo một mô -đun Python, được đặt tên là download.py. Tệp này sẽ chứa tất cả các chức năng cần thiết để tìm nạp danh sách các hình ảnh và tải xuống chúng. Chúng tôi sẽ chia các chức năng này thành ba chức năng riêng biệt:

  • import logging
    import os
    from time import time
    
    from download import setup_download_dir, get_links, download_link
    
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    logger = logging.getLogger(__name__)
    
    def main():
        ts = time()
        client_id = os.getenv('IMGUR_CLIENT_ID')
        if not client_id:
            raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
        download_dir = setup_download_dir()
        links = get_links(client_id)
        for link in links:
            download_link(download_dir, link)
        logging.info('Took %s seconds', time() - ts)
    
    if __name__ == '__main__':
        main()
    
    0
  • import logging
    import os
    from time import time
    
    from download import setup_download_dir, get_links, download_link
    
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    logger = logging.getLogger(__name__)
    
    def main():
        ts = time()
        client_id = os.getenv('IMGUR_CLIENT_ID')
        if not client_id:
            raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
        download_dir = setup_download_dir()
        links = get_links(client_id)
        for link in links:
            download_link(download_dir, link)
        logging.info('Took %s seconds', time() - ts)
    
    if __name__ == '__main__':
        main()
    
    1
  • import logging
    import os
    from time import time
    
    from download import setup_download_dir, get_links, download_link
    
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    logger = logging.getLogger(__name__)
    
    def main():
        ts = time()
        client_id = os.getenv('IMGUR_CLIENT_ID')
        if not client_id:
            raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
        download_dir = setup_download_dir()
        links = get_links(client_id)
        for link in links:
            download_link(download_dir, link)
        logging.info('Took %s seconds', time() - ts)
    
    if __name__ == '__main__':
        main()
    
    2

Hàm thứ ba,

import logging
import os
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    for link in links:
        download_link(download_dir, link)
    logging.info('Took %s seconds', time() - ts)

if __name__ == '__main__':
    main()
2, sẽ được sử dụng để tạo thư mục đích tải xuống nếu nó không tồn tại.

API IMGUR yêu cầu yêu cầu HTTP chịu tiêu đề

import logging
import os
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    for link in links:
        download_link(download_dir, link)
    logging.info('Took %s seconds', time() - ts)

if __name__ == '__main__':
    main()
4 với ID máy khách. Bạn có thể tìm thấy ID máy khách này từ bảng điều khiển của ứng dụng mà bạn đã đăng ký trên IMGUR và phản hồi sẽ được mã hóa JSON. Chúng ta có thể sử dụng thư viện JSON tiêu chuẩn Python, để giải mã nó. Tải xuống hình ảnh là một nhiệm vụ thậm chí còn đơn giản hơn, vì tất cả những gì bạn phải làm là lấy hình ảnh bằng URL của nó và ghi nó vào một tệp.

Đây là những gì kịch bản trông như thế nào:

import json
import logging
import os
from pathlib import Path
from urllib.request import urlopen, Request

logger = logging.getLogger(__name__)

types = {'image/jpeg', 'image/png'}


def get_links(client_id):
    headers = {'Authorization': 'Client-ID {}'.format(client_id)}
    req = Request('https://api.imgur.com/3/gallery/random/random/', headers=headers, method='GET')
    with urlopen(req) as resp:
        data = json.loads(resp.read().decode('utf-8'))
    return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types]


def download_link(directory, link):
    download_path = directory / os.path.basename(link)
    with urlopen(link) as image, download_path.open('wb') as f:
        f.write(image.read())
    logger.info('Downloaded %s', link)


def setup_download_dir():
    download_dir = Path('images')
    if not download_dir.exists():
        download_dir.mkdir()
    return download_dir

Tiếp theo, chúng ta sẽ cần viết một mô -đun sẽ sử dụng các chức năng này để tải xuống các hình ảnh, từng cái một. Chúng tôi sẽ đặt tên cho

import logging
import os
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    for link in links:
        download_link(download_dir, link)
    logging.info('Took %s seconds', time() - ts)

if __name__ == '__main__':
    main()
5 này. Điều này sẽ chứa chức năng chính của phiên bản tải xuống hình ảnh đầu tiên của chúng tôi. Mô -đun sẽ truy xuất ID máy khách IMGUR trong biến môi trường
import logging
import os
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    for link in links:
        download_link(download_dir, link)
    logging.info('Took %s seconds', time() - ts)

if __name__ == '__main__':
    main()
6. Nó sẽ gọi
import logging
import os
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    for link in links:
        download_link(download_dir, link)
    logging.info('Took %s seconds', time() - ts)

if __name__ == '__main__':
    main()
2 để tạo thư mục đích tải xuống. Cuối cùng, nó sẽ tìm nạp một danh sách các hình ảnh bằng hàm
import logging
import os
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    for link in links:
        download_link(download_dir, link)
    logging.info('Took %s seconds', time() - ts)

if __name__ == '__main__':
    main()
0, lọc tất cả các URL GIF và album, sau đó sử dụng
import logging
import os
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    for link in links:
        download_link(download_dir, link)
    logging.info('Took %s seconds', time() - ts)

if __name__ == '__main__':
    main()
1 để tải xuống và lưu từng hình ảnh đó vào đĩa. Đây là những gì
import logging
import os
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    for link in links:
        download_link(download_dir, link)
    logging.info('Took %s seconds', time() - ts)

if __name__ == '__main__':
    main()
5 trông giống như:

import logging
import os
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    for link in links:
        download_link(download_dir, link)
    logging.info('Took %s seconds', time() - ts)

if __name__ == '__main__':
    main()

Trên máy tính xách tay của tôi, tập lệnh này mất 19,4 giây để tải xuống 91 hình ảnh. Xin lưu ý rằng những con số này có thể thay đổi dựa trên mạng bạn đang bật. 19.4 giây là không dài khủng khiếp, nhưng nếu chúng ta muốn tải xuống nhiều hình ảnh hơn thì sao? Có lẽ 900 hình ảnh, thay vì 90. Với trung bình 0,2 giây mỗi hình ảnh, 900 hình ảnh sẽ mất khoảng 3 phút. Đối với 9000 hình ảnh sẽ mất 30 phút. Tin tốt là bằng cách giới thiệu đồng thời hoặc song song, chúng ta có thể tăng tốc độ này một cách đáng kể.

Tất cả các ví dụ mã tiếp theo sẽ chỉ hiển thị các báo cáo nhập mới và cụ thể cho các ví dụ đó. Để thuận tiện, tất cả các tập lệnh Python này có thể được tìm thấy trong kho GitHub này.

Đồng thời và song song trong Python: Ví dụ về luồng

Chủ đề là một trong những cách tiếp cận nổi tiếng nhất để đạt được sự đồng thời và song song của Python. Chủ đề là một tính năng thường được cung cấp bởi hệ điều hành. Chủ đề nhẹ hơn các quy trình và chia sẻ cùng một không gian bộ nhớ.

Hướng dẫn how many threads can be spawned python? - bao nhiêu chủ đề có thể được sinh ra python?

Trong ví dụ về luồng python này, chúng tôi sẽ viết một mô -đun mới để thay thế

import logging
import os
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    for link in links:
        download_link(download_dir, link)
    logging.info('Took %s seconds', time() - ts)

if __name__ == '__main__':
    main()
5. Mô -đun này sẽ tạo ra một nhóm gồm tám luồng, tạo ra tổng cộng chín luồng bao gồm cả luồng chính. Tôi đã chọn tám luồng công nhân vì máy tính của tôi có tám lõi CPU và một luồng công nhân trên mỗi lõi dường như là một con số tốt cho số lượng chủ đề để chạy cùng một lúc. Trong thực tế, con số này được chọn cẩn thận hơn nhiều dựa trên các yếu tố khác, chẳng hạn như các ứng dụng và dịch vụ khác chạy trên cùng một máy.

Điều này gần giống như cái trước, ngoại trừ chúng ta hiện có một lớp mới,

import logging
import os
from queue import Queue
from threading import Thread
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


class DownloadWorker(Thread):

    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            # Get the work from the queue and expand the tuple
            directory, link = self.queue.get()
            try:
                download_link(directory, link)
            finally:
                self.queue.task_done()


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    # Create a queue to communicate with the worker threads
    queue = Queue()
    # Create 8 worker threads
    for x in range(8):
        worker = DownloadWorker(queue)
        # Setting daemon to True will let the main thread exit even though the workers are blocking
        worker.daemon = True
        worker.start()
    # Put the tasks into the queue as a tuple
    for link in links:
        logger.info('Queueing {}'.format(link))
        queue.put((download_dir, link))
    # Causes the main thread to wait for the queue to finish processing all the tasks
    queue.join()
    logging.info('Took %s', time() - ts)

if __name__ == '__main__':
    main()
2, đây là một hậu duệ của lớp Python
import logging
import os
from queue import Queue
from threading import Thread
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


class DownloadWorker(Thread):

    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            # Get the work from the queue and expand the tuple
            directory, link = self.queue.get()
            try:
                download_link(directory, link)
            finally:
                self.queue.task_done()


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    # Create a queue to communicate with the worker threads
    queue = Queue()
    # Create 8 worker threads
    for x in range(8):
        worker = DownloadWorker(queue)
        # Setting daemon to True will let the main thread exit even though the workers are blocking
        worker.daemon = True
        worker.start()
    # Put the tasks into the queue as a tuple
    for link in links:
        logger.info('Queueing {}'.format(link))
        queue.put((download_dir, link))
    # Causes the main thread to wait for the queue to finish processing all the tasks
    queue.join()
    logging.info('Took %s', time() - ts)

if __name__ == '__main__':
    main()
3. Phương thức chạy đã được ghi đè, chạy một vòng lặp vô hạn. Trên mỗi lần lặp, nó gọi
import logging
import os
from queue import Queue
from threading import Thread
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


class DownloadWorker(Thread):

    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            # Get the work from the queue and expand the tuple
            directory, link = self.queue.get()
            try:
                download_link(directory, link)
            finally:
                self.queue.task_done()


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    # Create a queue to communicate with the worker threads
    queue = Queue()
    # Create 8 worker threads
    for x in range(8):
        worker = DownloadWorker(queue)
        # Setting daemon to True will let the main thread exit even though the workers are blocking
        worker.daemon = True
        worker.start()
    # Put the tasks into the queue as a tuple
    for link in links:
        logger.info('Queueing {}'.format(link))
        queue.put((download_dir, link))
    # Causes the main thread to wait for the queue to finish processing all the tasks
    queue.join()
    logging.info('Took %s', time() - ts)

if __name__ == '__main__':
    main()
4 để thử và lấy URL từ hàng đợi an toàn luồng. Nó chặn cho đến khi có một mục trong hàng đợi cho người lao động xử lý. Khi người lao động nhận được một mục từ hàng đợi, sau đó, nó gọi cùng một phương thức
import logging
import os
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    for link in links:
        download_link(download_dir, link)
    logging.info('Took %s seconds', time() - ts)

if __name__ == '__main__':
    main()
1 đã được sử dụng trong tập lệnh trước để tải hình ảnh vào thư mục hình ảnh. Sau khi tải xuống kết thúc, công nhân báo hiệu hàng đợi rằng nhiệm vụ đó được thực hiện. Điều này rất quan trọng, bởi vì hàng đợi theo dõi số lượng nhiệm vụ đã được thực hiện. Cuộc gọi đến
import logging
import os
from queue import Queue
from threading import Thread
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


class DownloadWorker(Thread):

    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            # Get the work from the queue and expand the tuple
            directory, link = self.queue.get()
            try:
                download_link(directory, link)
            finally:
                self.queue.task_done()


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    # Create a queue to communicate with the worker threads
    queue = Queue()
    # Create 8 worker threads
    for x in range(8):
        worker = DownloadWorker(queue)
        # Setting daemon to True will let the main thread exit even though the workers are blocking
        worker.daemon = True
        worker.start()
    # Put the tasks into the queue as a tuple
    for link in links:
        logger.info('Queueing {}'.format(link))
        queue.put((download_dir, link))
    # Causes the main thread to wait for the queue to finish processing all the tasks
    queue.join()
    logging.info('Took %s', time() - ts)

if __name__ == '__main__':
    main()
6 sẽ chặn chủ đề chính mãi mãi nếu các công nhân không báo hiệu rằng họ đã hoàn thành một nhiệm vụ.

import logging
import os
from queue import Queue
from threading import Thread
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


class DownloadWorker(Thread):

    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            # Get the work from the queue and expand the tuple
            directory, link = self.queue.get()
            try:
                download_link(directory, link)
            finally:
                self.queue.task_done()


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    # Create a queue to communicate with the worker threads
    queue = Queue()
    # Create 8 worker threads
    for x in range(8):
        worker = DownloadWorker(queue)
        # Setting daemon to True will let the main thread exit even though the workers are blocking
        worker.daemon = True
        worker.start()
    # Put the tasks into the queue as a tuple
    for link in links:
        logger.info('Queueing {}'.format(link))
        queue.put((download_dir, link))
    # Causes the main thread to wait for the queue to finish processing all the tasks
    queue.join()
    logging.info('Took %s', time() - ts)

if __name__ == '__main__':
    main()

Chạy tập lệnh ví dụ về luồng python này trên cùng một máy được sử dụng kết quả trước đó trong thời gian tải xuống là 4,1 giây! Đó là nhanh hơn 4,7 lần so với ví dụ trước. Mặc dù điều này nhanh hơn nhiều, nhưng điều đáng nói là chỉ có một chủ đề được thực hiện tại một thời điểm trong suốt quá trình này do GIL. Do đó, mã này đồng thời nhưng không song song. Lý do nó vẫn nhanh hơn là vì đây là một nhiệm vụ ràng buộc IO. Bộ xử lý hầu như không đổ mồ hôi trong khi tải xuống những hình ảnh này và phần lớn thời gian được dành để chờ mạng. Đây là lý do tại sao đa luồng Python có thể giúp tăng tốc độ lớn. Bộ xử lý có thể chuyển đổi giữa các luồng bất cứ khi nào một trong số chúng sẵn sàng thực hiện một số công việc. Sử dụng mô -đun luồng trong Python hoặc bất kỳ ngôn ngữ được giải thích nào khác với GIL thực sự có thể dẫn đến giảm hiệu suất. Nếu mã của bạn đang thực hiện một tác vụ ràng buộc CPU, chẳng hạn như giải nén các tệp GZIP, sử dụng mô -đun

import logging
import os
from queue import Queue
from threading import Thread
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


class DownloadWorker(Thread):

    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            # Get the work from the queue and expand the tuple
            directory, link = self.queue.get()
            try:
                download_link(directory, link)
            finally:
                self.queue.task_done()


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    # Create a queue to communicate with the worker threads
    queue = Queue()
    # Create 8 worker threads
    for x in range(8):
        worker = DownloadWorker(queue)
        # Setting daemon to True will let the main thread exit even though the workers are blocking
        worker.daemon = True
        worker.start()
    # Put the tasks into the queue as a tuple
    for link in links:
        logger.info('Queueing {}'.format(link))
        queue.put((download_dir, link))
    # Causes the main thread to wait for the queue to finish processing all the tasks
    queue.join()
    logging.info('Took %s', time() - ts)

if __name__ == '__main__':
    main()
7 sẽ dẫn đến thời gian thực hiện chậm hơn. Đối với các tác vụ ràng buộc CPU và thực hiện thực sự song song, chúng ta có thể sử dụng mô -đun đa xử lý.

Trong khi việc thực hiện thực tế Python thực hiện, Cpython, có một GIL, nhưng điều này không đúng với tất cả các triển khai Python. Ví dụ, Ironpython, một triển khai Python sử dụng khung .NET, không có GIL và Jython, việc triển khai dựa trên Java cũng không. Bạn có thể tìm thấy một danh sách các triển khai Python hoạt động ở đây.

Đồng thời và song song trong Python Ví dụ 2: Sinh sản nhiều quá trình

Mô -đun đa bộ xử lý dễ dàng giảm hơn so với mô -đun luồng, vì chúng tôi không cần thêm một lớp như ví dụ về luồng Python. Những thay đổi duy nhất chúng ta cần thực hiện là trong chức năng chính.

Hướng dẫn how many threads can be spawned python? - bao nhiêu chủ đề có thể được sinh ra python?

Để sử dụng nhiều quy trình, chúng tôi tạo ra một đa xử lý

import logging
import os
from queue import Queue
from threading import Thread
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


class DownloadWorker(Thread):

    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            # Get the work from the queue and expand the tuple
            directory, link = self.queue.get()
            try:
                download_link(directory, link)
            finally:
                self.queue.task_done()


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    # Create a queue to communicate with the worker threads
    queue = Queue()
    # Create 8 worker threads
    for x in range(8):
        worker = DownloadWorker(queue)
        # Setting daemon to True will let the main thread exit even though the workers are blocking
        worker.daemon = True
        worker.start()
    # Put the tasks into the queue as a tuple
    for link in links:
        logger.info('Queueing {}'.format(link))
        queue.put((download_dir, link))
    # Causes the main thread to wait for the queue to finish processing all the tasks
    queue.join()
    logging.info('Took %s', time() - ts)

if __name__ == '__main__':
    main()
8. Với phương thức MAP mà nó cung cấp, chúng tôi sẽ chuyển danh sách các URL đến nhóm, từ đó sẽ sinh ra tám quy trình mới và sử dụng từng quy trình để tải xuống các hình ảnh song song. Đây là sự song song thực sự, nhưng nó đi kèm với một chi phí. Toàn bộ bộ nhớ của tập lệnh được sao chép vào mỗi quy trình con được sinh ra. Trong ví dụ đơn giản này, nó không phải là một vấn đề lớn, nhưng nó có thể dễ dàng trở thành chi phí nghiêm trọng cho các chương trình không tầm thường.

import logging
import os
from functools import partial
from multiprocessing.pool import Pool
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    download = partial(download_link, download_dir)
    with Pool(4) as p:
        p.map(download, links)
    logging.info('Took %s seconds', time() - ts)


if __name__ == '__main__':
    main()

Đồng thời và song song trong Python Ví dụ 3: Phân phối cho nhiều công nhân

Mặc dù các mô -đun phân xử và đa bộ xử lý rất tốt cho các tập lệnh đang chạy trên máy tính cá nhân của bạn, bạn nên làm gì nếu bạn muốn công việc được thực hiện trên một máy khác hoặc bạn cần mở rộng lên nhiều hơn CPU trên một máy có thể xử lý? Một trường hợp sử dụng tuyệt vời cho điều này là các nhiệm vụ back-end dài cho các ứng dụng web. Nếu bạn có một số nhiệm vụ chạy dài, bạn không muốn quay một loạt các quy trình phụ hoặc luồng trên cùng một máy cần phải chạy phần còn lại của mã ứng dụng. Điều này sẽ làm giảm hiệu suất của ứng dụng của bạn cho tất cả người dùng của bạn. Điều tuyệt vời là có thể chạy các công việc này trên một máy khác hoặc nhiều máy khác.

Một thư viện Python tuyệt vời cho nhiệm vụ này là RQ, một thư viện rất đơn giản nhưng mạnh mẽ. Trước tiên, bạn tạo ra một hàm và các đối số của nó bằng thư viện. Điều này Pickles biểu diễn cuộc gọi chức năng, sau đó được thêm vào danh sách Redis. Enqueueing công việc là bước đầu tiên, nhưng sẽ không làm gì cả. Chúng tôi cũng cần ít nhất một công nhân để lắng nghe hàng đợi công việc đó.

Hướng dẫn how many threads can be spawned python? - bao nhiêu chủ đề có thể được sinh ra python?

Bước đầu tiên là cài đặt và chạy máy chủ Redis trên máy tính của bạn hoặc có quyền truy cập vào máy chủ Redis đang chạy. Sau đó, chỉ có một vài thay đổi nhỏ được thực hiện đối với mã hiện có. Trước tiên, chúng tôi tạo một thể hiện của một hàng đợi RQ và chuyển nó một ví dụ của một máy chủ Redis từ thư viện Redis-PY. Sau đó, thay vì chỉ gọi phương thức

import logging
import os
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    for link in links:
        download_link(download_dir, link)
    logging.info('Took %s seconds', time() - ts)

if __name__ == '__main__':
    main()
1 của chúng tôi, chúng tôi gọi
import logging
import os
from functools import partial
from multiprocessing.pool import Pool
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    download = partial(download_link, download_dir)
    with Pool(4) as p:
        p.map(download, links)
    logging.info('Took %s seconds', time() - ts)


if __name__ == '__main__':
    main()
0. Phương thức Enqueue có chức năng như đối số đầu tiên của nó, sau đó bất kỳ đối số hoặc đối số từ khóa nào khác được chuyển theo hàm đó khi công việc thực sự được thực hiện.

Một bước cuối cùng chúng ta cần làm là bắt đầu một số công nhân. RQ cung cấp một tập lệnh tiện dụng để chạy công nhân trên hàng đợi mặc định. Chỉ cần chạy

import logging
import os
from functools import partial
from multiprocessing.pool import Pool
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    download = partial(download_link, download_dir)
    with Pool(4) as p:
        p.map(download, links)
    logging.info('Took %s seconds', time() - ts)


if __name__ == '__main__':
    main()
1 trong cửa sổ đầu cuối và nó sẽ bắt đầu một công nhân nghe trên hàng đợi mặc định. Vui lòng đảm bảo thư mục làm việc hiện tại của bạn giống như nơi các tập lệnh cư trú. Nếu bạn muốn nghe một hàng đợi khác, bạn có thể chạy
import logging
import os
from functools import partial
from multiprocessing.pool import Pool
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    download = partial(download_link, download_dir)
    with Pool(4) as p:
        p.map(download, links)
    logging.info('Took %s seconds', time() - ts)


if __name__ == '__main__':
    main()
2 và nó sẽ nghe hàng đợi được đặt tên đó. Điều tuyệt vời về RQ là miễn là bạn có thể kết nối với Redis, bạn có thể chạy nhiều công nhân như bạn muốn trên nhiều máy khác nhau như bạn muốn; Do đó, nó rất dễ dàng để mở rộng quy mô khi ứng dụng của bạn phát triển. Đây là nguồn cho phiên bản RQ:

import logging
import os

from redis import Redis

from rq import Queue

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    q = Queue(connection=Redis(host='localhost', port=6379))
    for link in links:
        q.enqueue(download_link, download_dir, link)

if __name__ == '__main__':
    main()

Tuy nhiên, RQ không phải là giải pháp hàng đợi công việc Python duy nhất. RQ rất dễ sử dụng và bao gồm các trường hợp sử dụng đơn giản rất tốt, nhưng nếu cần có nhiều tùy chọn nâng cao hơn, các giải pháp hàng đợi Python 3 khác (như cần tây) có thể được sử dụng.

Python đa luồng so với đa xử lý

Nếu mã của bạn bị ràng buộc IO, cả đa xử lý và đa luồng trong Python sẽ hoạt động cho bạn. Đa bộ xử lý là một điều dễ dàng hơn để giảm so với ren nhưng có chi phí bộ nhớ cao hơn. Nếu mã của bạn bị ràng buộc CPU, đa xử lý rất có thể sẽ là lựa chọn tốt hơn, đặc biệt là nếu máy đích có nhiều lõi hoặc CPU. Đối với các ứng dụng web và khi bạn cần mở rộng quy mô công việc trên nhiều máy, RQ sẽ tốt hơn cho bạn.


Cập nhật

Một cái gì đó mới kể từ Python 3.2 đã được chạm vào trong bài viết gốc là gói

import logging
import os
from functools import partial
from multiprocessing.pool import Pool
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    download = partial(download_link, download_dir)
    with Pool(4) as p:
        p.map(download, links)
    logging.info('Took %s seconds', time() - ts)


if __name__ == '__main__':
    main()
3. Gói này cung cấp một cách khác để sử dụng đồng thời và song song với Python.

Trong bài viết gốc, tôi đã đề cập rằng mô -đun đa bộ xử lý Python sẽ dễ dàng rơi vào mã hiện có so với mô -đun luồng. Điều này là do mô -đun luồng Python 3 yêu cầu phân nhóm lớp

import logging
import os
from queue import Queue
from threading import Thread
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


class DownloadWorker(Thread):

    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            # Get the work from the queue and expand the tuple
            directory, link = self.queue.get()
            try:
                download_link(directory, link)
            finally:
                self.queue.task_done()


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    # Create a queue to communicate with the worker threads
    queue = Queue()
    # Create 8 worker threads
    for x in range(8):
        worker = DownloadWorker(queue)
        # Setting daemon to True will let the main thread exit even though the workers are blocking
        worker.daemon = True
        worker.start()
    # Put the tasks into the queue as a tuple
    for link in links:
        logger.info('Queueing {}'.format(link))
        queue.put((download_dir, link))
    # Causes the main thread to wait for the queue to finish processing all the tasks
    queue.join()
    logging.info('Took %s', time() - ts)

if __name__ == '__main__':
    main()
3 và cũng tạo ra
import logging
import os
from functools import partial
from multiprocessing.pool import Pool
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    download = partial(download_link, download_dir)
    with Pool(4) as p:
        p.map(download, links)
    logging.info('Took %s seconds', time() - ts)


if __name__ == '__main__':
    main()
6 cho các luồng để theo dõi công việc.

Sử dụng đồng thời.

import logging
import os
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)

    # By placing the executor inside a with block, the executors shutdown method
    # will be called cleaning up threads.
    # 
    # By default, the executor sets number of workers to 5 times the number of
    # CPUs.
    with ThreadPoolExecutor() as executor:

        # Create a new partially applied function that stores the directory
        # argument.
        # 
        # This allows the download_link function that normally takes two
        # arguments to work with the map function that expects a function of a
        # single argument.
        fn = partial(download_link, download_dir)

        # Executes fn concurrently using threads on the links iterable. The
        # timeout is for the entire process, not a single call, so downloading
        # all images must complete within 30 seconds.
        executor.map(fn, links, timeout=30)


if __name__ == '__main__':
    main()

Bây giờ chúng tôi đã có tất cả các hình ảnh này được tải xuống với Python

import logging
import os
from functools import partial
from multiprocessing.pool import Pool
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    download = partial(download_link, download_dir)
    with Pool(4) as p:
        p.map(download, links)
    logging.info('Took %s seconds', time() - ts)


if __name__ == '__main__':
    main()
7 của chúng tôi, chúng tôi có thể sử dụng chúng để kiểm tra tác vụ ràng buộc CPU. Chúng ta có thể tạo các phiên bản hình thu nhỏ của tất cả các hình ảnh trong cả một tập lệnh đơn, đơn, sau đó kiểm tra một giải pháp dựa trên đa xử lý.

Chúng tôi sẽ sử dụng thư viện gối để xử lý việc thay đổi kích thước của hình ảnh.

Đây là kịch bản ban đầu của chúng tôi.

import logging
from pathlib import Path
from time import time

from PIL import Image

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


def create_thumbnail(size, path):
    """
    Creates a thumbnail of an image with the same name as image but with
    _thumbnail appended before the extension.  E.g.:

    >>> create_thumbnail((128, 128), 'image.jpg')

    A new thumbnail image is created with the name image_thumbnail.jpg

    :param size: A tuple of the width and height of the image
    :param path: The path to the image file
    :return: None
    """
    image = Image.open(path)
    image.thumbnail(size)
    path = Path(path)
    name = path.stem + '_thumbnail' + path.suffix
    thumbnail_path = path.with_name(name)
    image.save(thumbnail_path)


def main():
    ts = time()
    for image_path in Path('images').iterdir():
        create_thumbnail((128, 128), image_path)
    logging.info('Took %s', time() - ts)


if __name__ == '__main__':
    main()

Tập lệnh này lặp lại trên các đường dẫn trong thư mục

import logging
import os
from functools import partial
from multiprocessing.pool import Pool
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    download = partial(download_link, download_dir)
    with Pool(4) as p:
        p.map(download, links)
    logging.info('Took %s seconds', time() - ts)


if __name__ == '__main__':
    main()
8 và cho mỗi đường dẫn, nó chạy chức năng created_thumbnail. Chức năng này sử dụng gối để mở hình ảnh, tạo hình thu nhỏ và lưu hình ảnh mới, nhỏ hơn với cùng tên với bản gốc nhưng với
import logging
import os
from functools import partial
from multiprocessing.pool import Pool
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    download = partial(download_link, download_dir)
    with Pool(4) as p:
        p.map(download, links)
    logging.info('Took %s seconds', time() - ts)


if __name__ == '__main__':
    main()
9 được gắn vào tên.

Chạy tập lệnh này trên 160 hình ảnh tổng cộng 36 triệu mất 2,32 giây. Hãy xem liệu chúng ta có thể tăng tốc độ này bằng cách sử dụng ProcessPoolExecutor hay không.

import logging
from pathlib import Path
from time import time
from functools import partial

from concurrent.futures import ProcessPoolExecutor

from PIL import Image

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


def create_thumbnail(size, path):
    """
    Creates a thumbnail of an image with the same name as image but with
    _thumbnail appended before the extension. E.g.:

    >>> create_thumbnail((128, 128), 'image.jpg')

    A new thumbnail image is created with the name image_thumbnail.jpg

    :param size: A tuple of the width and height of the image
    :param path: The path to the image file
    :return: None
    """
    path = Path(path)
    name = path.stem + '_thumbnail' + path.suffix
    thumbnail_path = path.with_name(name)
    image = Image.open(path)
    image.thumbnail(size)
    image.save(thumbnail_path)


def main():
    ts = time()
    # Partially apply the create_thumbnail method, setting the size to 128x128
    # and returning a function of a single argument.
    thumbnail_128 = partial(create_thumbnail, (128, 128))

    # Create the executor in a with block so shutdown is called when the block
    # is exited.
    with ProcessPoolExecutor() as executor:
        executor.map(thumbnail_128, Path('images').iterdir())
    logging.info('Took %s', time() - ts)


if __name__ == '__main__':
    main()

Phương pháp

import logging
import os

from redis import Redis

from rq import Queue

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    q = Queue(connection=Redis(host='localhost', port=6379))
    for link in links:
        q.enqueue(download_link, download_dir, link)

if __name__ == '__main__':
    main()
0 giống hệt với tập lệnh cuối cùng. Sự khác biệt chính là việc tạo ra một
import logging
import os

from redis import Redis

from rq import Queue

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    q = Queue(connection=Redis(host='localhost', port=6379))
    for link in links:
        q.enqueue(download_link, download_dir, link)

if __name__ == '__main__':
    main()
1. Phương thức bản đồ của người thực thi được sử dụng để tạo hình thu nhỏ song song. Theo mặc định,
import logging
import os

from redis import Redis

from rq import Queue

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    q = Queue(connection=Redis(host='localhost', port=6379))
    for link in links:
        q.enqueue(download_link, download_dir, link)

if __name__ == '__main__':
    main()
1 tạo ra một quy trình con trên mỗi CPU. Chạy tập lệnh này trên cùng 160 hình ảnh mất 1,05 giây nhanh hơn 2,2 lần!

Async/Await (chỉ Python 3.5+)

Một trong những mục được yêu cầu nhiều nhất trong các bình luận về bài viết gốc là một ví dụ sử dụng mô -đun Asyncio của Python 3. So với các ví dụ khác, có một số cú pháp Python mới có thể mới đối với hầu hết mọi người và cả một số khái niệm mới. Một lớp phức tạp bổ sung đáng tiếc là do mô-đun

import logging
import os

from redis import Redis

from rq import Queue

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    q = Queue(connection=Redis(host='localhost', port=6379))
    for link in links:
        q.enqueue(download_link, download_dir, link)

if __name__ == '__main__':
    main()
3 tích hợp của Python không được đồng bộ hóa. Chúng tôi sẽ cần sử dụng một thư viện HTTP Async để có được các lợi ích đầy đủ của Asyncio. Đối với điều này, chúng tôi sẽ sử dụng Aiohttp.

Hãy để Lừa nhảy ngay vào mã và một lời giải thích chi tiết hơn sẽ theo sau.

import asyncio
import logging
import os
from time import time

import aiohttp

from download import setup_download_dir, get_links

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


async def async_download_link(session, directory, link):
    """
    Async version of the download_link method we've been using in the other examples.
    :param session: aiohttp ClientSession
    :param directory: directory to save downloads
    :param link: the url of the link to download
    :return:
    """
    download_path = directory / os.path.basename(link)
    async with session.get(link) as response:
        with download_path.open('wb') as f:
            while True:
                # await pauses execution until the 1024 (or less) bytes are read from the stream
                chunk = await response.content.read(1024)
                if not chunk:
                    # We are done reading the file, break out of the while loop
                    break
                f.write(chunk)
    logger.info('Downloaded %s', link)


# Main is now a coroutine
async def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    # We use a session to take advantage of tcp keep-alive
    # Set a 3 second read and connect timeout. Default is 5 minutes
    async with aiohttp.ClientSession(conn_timeout=3, read_timeout=3) as session:
        tasks = [(async_download_link(session, download_dir, l)) for l in get_links(client_id)]
        # gather aggregates all the tasks and schedules them in the event loop
        await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == '__main__':
    ts = time()
    # Create the asyncio event loop
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        # Shutdown the loop even if there is an exception
        loop.close()
    logger.info('Took %s seconds to complete', time() - ts)

Có khá nhiều để giải nén ở đây. Hãy bắt đầu với điểm nhập cảnh chính của chương trình. Điều mới đầu tiên chúng tôi làm với mô -đun Asyncio là có được vòng lặp sự kiện. Vòng lặp sự kiện xử lý tất cả các mã không đồng bộ. Sau đó, vòng lặp được chạy cho đến khi hoàn thành và vượt qua hàm

import logging
import os

from redis import Redis

from rq import Queue

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    q = Queue(connection=Redis(host='localhost', port=6379))
    for link in links:
        q.enqueue(download_link, download_dir, link)

if __name__ == '__main__':
    main()
4. Có một phần của cú pháp mới trong định nghĩa của chính:
import logging
import os

from redis import Redis

from rq import Queue

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    q = Queue(connection=Redis(host='localhost', port=6379))
    for link in links:
        q.enqueue(download_link, download_dir, link)

if __name__ == '__main__':
    main()
5. Bạn cũng sẽ nhận thấy
import logging
import os

from redis import Redis

from rq import Queue

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    q = Queue(connection=Redis(host='localhost', port=6379))
    for link in links:
        q.enqueue(download_link, download_dir, link)

if __name__ == '__main__':
    main()
6 và
import logging
import os

from redis import Redis

from rq import Queue

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    q = Queue(connection=Redis(host='localhost', port=6379))
    for link in links:
        q.enqueue(download_link, download_dir, link)

if __name__ == '__main__':
    main()
7.

Cú pháp Async/Await đã được giới thiệu trong PEP492. Cú pháp

import logging
import os

from redis import Redis

from rq import Queue

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    q = Queue(connection=Redis(host='localhost', port=6379))
    for link in links:
        q.enqueue(download_link, download_dir, link)

if __name__ == '__main__':
    main()
5 đánh dấu một chức năng như một coroutine. Trong nội bộ, các coroutines dựa trên các trình tạo Python, nhưng aren hoàn toàn giống nhau. Coroutines trả về một đối tượng Coroutine tương tự như cách các trình tạo trả về một đối tượng máy phát. Khi bạn có một coroutine, bạn có được kết quả của nó với biểu thức
import logging
import os

from redis import Redis

from rq import Queue

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    q = Queue(connection=Redis(host='localhost', port=6379))
    for link in links:
        q.enqueue(download_link, download_dir, link)

if __name__ == '__main__':
    main()
6. Khi một coroutine gọi
import logging
import os

from redis import Redis

from rq import Queue

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    q = Queue(connection=Redis(host='localhost', port=6379))
    for link in links:
        q.enqueue(download_link, download_dir, link)

if __name__ == '__main__':
    main()
6, việc thực hiện coroutine bị đình chỉ cho đến khi có thể chờ hoàn thành. Việc đình chỉ này cho phép hoàn thành công việc khác trong khi Coroutine bị đình chỉ đang chờ đợi một số kết quả. Nói chung, kết quả này sẽ là một loại I/O như yêu cầu cơ sở dữ liệu hoặc trong trường hợp của chúng tôi là yêu cầu HTTP.

Hàm

import logging
import os
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    for link in links:
        download_link(download_dir, link)
    logging.info('Took %s seconds', time() - ts)

if __name__ == '__main__':
    main()
1 phải được thay đổi khá đáng kể. Trước đây, chúng tôi đã dựa vào
import logging
import os

from redis import Redis

from rq import Queue

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    q = Queue(connection=Redis(host='localhost', port=6379))
    for link in links:
        q.enqueue(download_link, download_dir, link)

if __name__ == '__main__':
    main()
3 để thực hiện công việc đọc hình ảnh cho chúng tôi. Bây giờ, để cho phép phương pháp của chúng tôi hoạt động đúng với mô hình lập trình ASYNC, chúng tôi đã giới thiệu một vòng lặp
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)

    # By placing the executor inside a with block, the executors shutdown method
    # will be called cleaning up threads.
    # 
    # By default, the executor sets number of workers to 5 times the number of
    # CPUs.
    with ThreadPoolExecutor() as executor:

        # Create a new partially applied function that stores the directory
        # argument.
        # 
        # This allows the download_link function that normally takes two
        # arguments to work with the map function that expects a function of a
        # single argument.
        fn = partial(download_link, download_dir)

        # Executes fn concurrently using threads on the links iterable. The
        # timeout is for the entire process, not a single call, so downloading
        # all images must complete within 30 seconds.
        executor.map(fn, links, timeout=30)


if __name__ == '__main__':
    main()
3 đọc các khối hình ảnh tại một thời điểm và đình chỉ thực thi trong khi chờ hoàn thành I/O. Điều này cho phép vòng lặp sự kiện lặp lại thông qua việc tải xuống các hình ảnh khác nhau vì mỗi hình ảnh có sẵn dữ liệu mới trong quá trình tải xuống.

Nên có một - một cách tốt nhất là chỉ có một cách tốt để làm điều đó

Mặc dù Zen of Python nói với chúng ta rằng nên có một cách rõ ràng để làm một cái gì đó, có nhiều cách trong Python để giới thiệu đồng thời vào các chương trình của chúng ta. Phương pháp tốt nhất để chọn là sẽ phụ thuộc vào trường hợp sử dụng cụ thể của bạn. Mô hình không đồng bộ tỷ lệ tốt hơn với khối lượng công việc đối chiếu cao (như máy chủ web) so với luồng hoặc đa xử lý, nhưng nó đòi hỏi mã của bạn (và phụ thuộc) phải không đồng bộ để có lợi hoàn toàn.

Hy vọng rằng các ví dụ về luồng python trong bài viết này, và cập nhật, sẽ chỉ cho bạn đi đúng hướng để bạn có ý tưởng về nơi nhìn vào thư viện tiêu chuẩn Python nếu bạn cần giới thiệu đồng thời vào các chương trình của mình.

Giới hạn chủ đề trong Python là gì?

Chỉ một!OK, điều đó không chính xác là sự thật.Sự thật là, bạn có thể chạy nhiều luồng trong Python như bạn có bộ nhớ, nhưng tất cả các luồng trong một quy trình Python chạy trên một lõi máy duy nhất, vì vậy về mặt kỹ thuật chỉ có một luồng thực sự được thực hiện cùng một lúc.you can run as many threads in Python as you have memory for, but all threads in a Python process run on a single machine core, so technically only one thread is actually executing at once.

Một chương trình Python có thể có nhiều chủ đề?

Để tóm tắt lại, luồng trong Python cho phép nhiều luồng được tạo trong một quy trình duy nhất, nhưng do Gil, không ai trong số chúng sẽ chạy cùng một lúc.Chủ đề vẫn là một lựa chọn rất tốt khi chạy nhiều tác vụ ràng buộc I/O đồng thời.threading in Python allows multiple threads to be created within a single process, but due to GIL, none of them will ever run at the exact same time. Threading is still a very good option when it comes to running multiple I/O bound tasks concurrently.

Có giới hạn cho bao nhiêu chủ đề?

4.2.Các cửa sổ.Trên máy Windows, không có giới hạn được chỉ định cho các luồng.Do đó, chúng tôi có thể tạo bao nhiêu chủ đề như chúng tôi muốn, cho đến khi hệ thống của chúng tôi hết bộ nhớ hệ thống có sẵn.there's no limit specified for threads. Thus, we can create as many threads as we want, until our system runs out of available system memory.

Điều gì xảy ra nếu bạn sinh ra quá nhiều chủ đề?

Bạn cần ngăn chặn quá nhiều chủ đề bị sinh ra, một vấn đề có khả năng dẫn đến việc từ chối dịch vụ do tài nguyên hệ thống cạn kiệt.a denial of service owing to exhausted system resources.