Python chạy nhiều chức năng không đồng bộ

Một trong những tính năng đẹp hơn là một phần của Python 3 là chức năng ________ 61 / ________ 62 mới. Đây hiện là một tính năng của trình thông dịch Python và phần lớn có thể thay thế các gói cũ hơn như Twisted hoặc Eventlet. Một số người có thể thích phong cách lập trình của các framework như Twisted, nhưng tôi thì không. Mô-đun

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
3 cung cấp cho bạn tất cả các công cụ cần thiết để chạy đồng thời một loạt tác vụ mà không thực sự cần phải lo lắng quá nhiều về cách bất kỳ tác vụ nào trong số này được triển khai. Thay đổi lớn nhất là nhu cầu sử dụng từ khóa
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
1 và
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
2 trong mã Python của bạn. Miễn là các nhiệm vụ của bạn bằng cách nào đó bị ràng buộc bởi IO (như máy chủ trò chuyện), mọi thứ sẽ hoạt động khá tốt. Tất cả thực thi thực tế vẫn diễn ra trên một luồng hệ điều hành duy nhất. Python thực tế là đơn luồng do GIL và không chắc điều này sẽ thay đổi trong tương lai gần

Nếu bạn đang tìm kiếm phần giới thiệu cơ bản về cách sử dụng ________ 62 / ________ 61, tôi sẽ bắt đầu với hướng dẫn này

Golang's
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
4

Việc bổ sung ________ 61/________ 62 trong Python thực sự khiến nó ngang bằng với các ngôn ngữ khác khi bạn đang cố gắng thực hiện một loạt công cụ kiểu mạng. Hiện tôi đã xây dựng một số dự án thử nghiệm, chẳng hạn như một máy chủ có thể truyền nguồn cấp dữ liệu sự kiện tới trình duyệt bằng WebSockets. Điều này đã được thực hiện bằng cách sử dụng gói websockets. Đó là một dự án đơn giản và tôi không gặp bất kỳ khó khăn nào trong quá trình học tập. Quay lại khi Golang vừa mới đạt phiên bản 1. 0 Tôi thường xuyên sử dụng Go tại nơi làm việc. Một trong những tính năng thú vị của cờ vây là câu lệnh

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
4. Câu lệnh
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
4 cho phép bạn đợi nhiều câu lệnh cho đến khi một trong số chúng có sẵn kết quả

Đây là một ví dụ, được sử dụng theo giấy phép của Mark McGranaghan từ đây

package main

import (
    "fmt"
    "time"
)

func main() {

    c1 := make(chan string)
    c2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        c1 <- "one"
    }()
    go func() {
        time.Sleep(2 * time.Second)
        c2 <- "two"
    }()

    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("received", msg1)
        case msg2 := <-c2:
            fmt.Println("received", msg2)
        }
    }
}

Đối với những người không quen thuộc với Go, tôi sẽ liệt kê ngắn gọn những gì nó làm

  1. Nó bắt đầu một chương trình với hai kênh,
    async def after_one_minute():
      await asyncio.sleep(60.0)
      return 'trigger'
    
    9 và
    q1 = asyncio.Queue() # pretend something puts stuff in this
    q1 = asyncio.Queue() # pretend something puts stuff in this
    done = False
    results = []
    while not done:
      # This really doesn't work, don't try it 
      done, pending = await asyncio.wait([q1.get(), q2.get()], return_when = asyncio.FIRST_COMPLETED)
    
      for d in done:
        results.append(d.result())
      done = len(results) >= 100 # run until we get some results
    
    0. Các kênh trong Go không đồng bộ. hàng đợi sự kiện
  2. Nó bắt đầu hai goroutines, mỗi goroutines sẽ đưa một tin nhắn vào một trong các kênh sau một độ trễ nhất định. Chúng chạy đồng thời với luồng chính
  3. Chủ đề chính chờ thứ gì đó khả dụng trong các kênh được tạo ở bước 1, bằng cách sử dụng
    async def after_one_minute():
      await asyncio.sleep(60.0)
      return 'trigger'
    
    4

Việc sử dụng câu lệnh

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
4 thực sự hiệu quả, bởi vì cho đến khi có thứ gì đó khả dụng ở một trong các kênh, luồng chính sẽ không hoạt động và không làm gì cả. Ngay khi bất kỳ câu lệnh nào được liệt kê trong
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
4 sẵn sàng thì luồng chính sẽ xử lý kết quả

Cách
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
2 nhiều chức năng trong Python

Đối với các dự án tôi làm việc, thông thường tôi muốn một tác vụ duy nhất có thể đợi các sự kiện từ bất kỳ nguồn nào. Tôi thích mẫu lập trình này hơn vì nó cho phép bạn đưa vào các loại dữ liệu khác nhau và làm điều gì đó với nó theo kiểu nối tiếp. Điều này có vẻ hơi ngược, vì nó có nghĩa là một hệ thống đồng thời phải xử lý nhiều sự kiện thông qua một đường dẫn mã. Nhưng có những ứng dụng thực tế của điều này. Xem xét nếu bạn có một bảng cơ sở dữ liệu có tên là

q1 = asyncio.Queue() # pretend something puts stuff in this
q1 = asyncio.Queue() # pretend something puts stuff in this
done = False
results = []
while not done:
  # This really doesn't work, don't try it 
  done, pending = await asyncio.wait([q1.get(), q2.get()], return_when = asyncio.FIRST_COMPLETED)

  for d in done:
    results.append(d.result())
  done = len(results) >= 100 # run until we get some results
5 với các cột
q1 = asyncio.Queue() # pretend something puts stuff in this
q1 = asyncio.Queue() # pretend something puts stuff in this
done = False
results = []
while not done:
  # This really doesn't work, don't try it 
  done, pending = await asyncio.wait([q1.get(), q2.get()], return_when = asyncio.FIRST_COMPLETED)

  for d in done:
    results.append(d.result())
  done = len(results) >= 100 # run until we get some results
6 và
q1 = asyncio.Queue() # pretend something puts stuff in this
q1 = asyncio.Queue() # pretend something puts stuff in this
done = False
results = []
while not done:
  # This really doesn't work, don't try it 
  done, pending = await asyncio.wait([q1.get(), q2.get()], return_when = asyncio.FIRST_COMPLETED)

  for d in done:
    results.append(d.result())
  done = len(results) >= 100 # run until we get some results
7. Bạn có thể sử dụng phần này như một phần của trang web để theo dõi lần cuối cùng người dùng hoạt động trên trang web. Câu trả lời rõ ràng là cập nhật hàng tương ứng cho mỗi người dùng mỗi khi họ xem một trang. Về cơ bản, điều này có nghĩa là mọi lượt xem trang sẽ tạo ra một bản cập nhật cơ sở dữ liệu. Điều này sẽ không mở rộng quy mô tốt vì cơ sở dữ liệu của bạn sẽ có nhiều bản cập nhật hơn được áp dụng mỗi giây khi lưu lượng truy cập tăng lên

Nếu bạn lùi lại và cân nhắc thì có thể bạn không cần theo dõi từng hoạt động của người dùng đến từng mili giây mà thay vào đó xuống mức từng phút, bạn chỉ cần cập nhật bảng mỗi phút một lần cho tất cả người dùng đã hoạt động trong phút qua. Để thực hiện điều này với một số loại hệ thống hướng sự kiện, bạn thực sự chỉ cần hai nguồn sự kiện

  1. Nguồn sự kiện cho hoạt động của người dùng
  2. Nguồn sự kiện phát ra sự kiện kích hoạt đặc biệt 60 giây

Khi bạn nhận được các sự kiện từ nguồn 1, bạn chỉ có thể theo dõi thời gian hoạt động cuối cùng cho mỗi người dùng trong Python

q1 = asyncio.Queue() # pretend something puts stuff in this
q1 = asyncio.Queue() # pretend something puts stuff in this
done = False
results = []
while not done:
  # This really doesn't work, don't try it 
  done, pending = await asyncio.wait([q1.get(), q2.get()], return_when = asyncio.FIRST_COMPLETED)

  for d in done:
    results.append(d.result())
  done = len(results) >= 100 # run until we get some results
8 trong bộ nhớ. Khi sự kiện nguồn sự kiện thứ hai phát ra sự kiện kích hoạt, bạn xử lý
q1 = asyncio.Queue() # pretend something puts stuff in this
q1 = asyncio.Queue() # pretend something puts stuff in this
done = False
results = []
while not done:
  # This really doesn't work, don't try it 
  done, pending = await asyncio.wait([q1.get(), q2.get()], return_when = asyncio.FIRST_COMPLETED)

  for d in done:
    results.append(d.result())
  done = len(results) >= 100 # run until we get some results
8 và cập nhật tất cả các hàng thích hợp

Lớp tương đương với các kênh của Golang trong Python là

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
40. Đó là một hàng đợi sự kiện được tùy chọn giới hạn số lượng sự kiện mà nó nắm giữ. Khi đặt đồ vật vào hàng đợi, bạn chỉ cần
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
41 để chặn cho đến khi đồ vật được đặt vào hàng đợi. Nếu không gian đã có sẵn, điều này sẽ trả lại ngay lập tức. Để lấy các mục từ hàng đợi, bạn chỉ cần thực hiện
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
42. Cuộc gọi này chặn cho đến khi một mục có thể được lấy từ hàng đợi

Kích hoạt mỗi phút một lần nghe có vẻ phức tạp, nhưng không phải vậy. Để có được một nguồn sự kiện với một sự kiện kích hoạt, cứ sau 60 giây, bạn thực sự chỉ cần bọc hàm

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
43 dựng sẵn

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'

Nếu bạn gọi

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
44, kết quả sẽ không có trong ít nhất 60 giây

Trường hợp Python xuất hiện ngắn là nếu bạn muốn đợi nhiều hàng đợi cùng một lúc. Không có tương đương trực tiếp với câu lệnh

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
4 từ Golang. Những thứ có sẵn gần nhất trong Python là
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
46 và
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
47. Tài liệu cho các chức năng đó có sẵn tại đây

Sử dụng

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
46 sẽ trả lại cho bạn kết quả của các coroutine mà bạn thường sẽ
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
2 từng cái một, theo bất kỳ thứ tự nào mà chúng phải hoàn thành. Rõ ràng, không có sự kiểm soát nào đối với thứ tự của kết quả. Vì vậy, điều này không thực sự hữu ích

Hàm

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
47 theo mặc định chờ tất cả các phần chờ hoàn thành. Vì vậy, bạn không nhận được kết quả khi chúng có sẵn. Bạn có thể thay đổi điều này bằng cách chuyển tham số của
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
51 được đặt thành
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
52 để nhận kết quả đầu tiên ngay khi có sẵn

Cả hai chức năng này đều không thể so sánh trực tiếp với việc sử dụng câu lệnh Golang

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
4 bên trong vòng lặp
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
54. Câu lệnh
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
4 cho phép bạn tiếp tục lấy đi nhận lại các mục từ một kênh miễn là kênh đó vẫn còn các mục trong đó. Cố gắng thực hiện điều này trong mã Python kết thúc với một cái gì đó như thế này

q1 = asyncio.Queue() # pretend something puts stuff in this
q1 = asyncio.Queue() # pretend something puts stuff in this
done = False
results = []
while not done:
  # This really doesn't work, don't try it 
  done, pending = await asyncio.wait([q1.get(), q2.get()], return_when = asyncio.FIRST_COMPLETED)

  for d in done:
    results.append(d.result())
  done = len(results) >= 100 # run until we get some results

Hóa ra đoạn mã trên có vô số vấn đề

  1. Các giá trị
    async def after_one_minute():
      await asyncio.sleep(60.0)
      return 'trigger'
    
    56 và
    async def after_one_minute():
      await asyncio.sleep(60.0)
      return 'trigger'
    
    57 là loại
    async def after_one_minute():
      await asyncio.sleep(60.0)
      return 'trigger'
    
    58 và việc tìm ra kết quả đến từ đâu là điều không hề đơn giản
  2. async def after_one_minute():
      await asyncio.sleep(60.0)
      return 'trigger'
    
    56 và
    async def after_one_minute():
      await asyncio.sleep(60.0)
      return 'trigger'
    
    57 chứa các đối tượng thuộc loại
    package main
    
    import (
        "fmt"
        "time"
    )
    
    func main() {
    
        c1 := make(chan string)
        c2 := make(chan string)
    
        go func() {
            time.Sleep(1 * time.Second)
            c1 <- "one"
        }()
        go func() {
            time.Sleep(2 * time.Second)
            c2 <- "two"
        }()
    
        for i := 0; i < 2; i++ {
            select {
            case msg1 := <-c1:
                fmt.Println("received", msg1)
            case msg2 := <-c2:
                fmt.Println("received", msg2)
            }
        }
    }
    
    61, mặc dù các coroutine đã được thông qua
  3. Mỗi lần lặp lại của vòng lặp gọi cả
    package main
    
    import (
        "fmt"
        "time"
    )
    
    func main() {
    
        c1 := make(chan string)
        c2 := make(chan string)
    
        go func() {
            time.Sleep(1 * time.Second)
            c1 <- "one"
        }()
        go func() {
            time.Sleep(2 * time.Second)
            c2 <- "two"
        }()
    
        for i := 0; i < 2; i++ {
            select {
            case msg1 := <-c1:
                fmt.Println("received", msg1)
            case msg2 := <-c2:
                fmt.Println("received", msg2)
            }
        }
    }
    
    62 và
    package main
    
    import (
        "fmt"
        "time"
    )
    
    func main() {
    
        c1 := make(chan string)
        c2 := make(chan string)
    
        go func() {
            time.Sleep(1 * time.Second)
            c1 <- "one"
        }()
        go func() {
            time.Sleep(2 * time.Second)
            c2 <- "two"
        }()
    
        for i := 0; i < 2; i++ {
            select {
            case msg1 := <-c1:
                fmt.Println("received", msg1)
            case msg2 := <-c2:
                fmt.Println("received", msg2)
            }
        }
    }
    
    63 để tạo ra các coroutine mới, mặc dù lần lặp lại trước đó chỉ có một trong số chúng thực sự hoàn thành

Vì vậy, rõ ràng là bạn không thể chỉ ghé vào

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
47 và để nó làm những gì bạn muốn. Các vấn đề #1 và #2 có thể được khắc phục bằng cách gói tất cả các coroutine của bạn dưới dạng các tác vụ bằng cách gọi
package main

import (
    "fmt"
    "time"
)

func main() {

    c1 := make(chan string)
    c2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        c1 <- "one"
    }()
    go func() {
        time.Sleep(2 * time.Second)
        c2 <- "two"
    }()

    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("received", msg1)
        case msg2 := <-c2:
            fmt.Println("received", msg2)
        }
    }
}
65, theo dõi các tác vụ đó và sau đó so sánh chúng với những gì được trả về trong
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
56 và
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
57

Để tránh vấn đề #3, bạn cũng phải theo dõi quy trình đăng ký nào thực sự đã hoàn thành và nhớ "bắt đầu" lại nó bằng cách chỉ gọi lại chức năng tương tự. Tất cả điều này có thể được kết hợp thành một đoạn mã rất xấu xí như thế này

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
4

Điều này chủ yếu giải quyết các vấn đề được liệt kê ở trên, mặc dù bây giờ bạn có vấn đề cần gọi

package main

import (
    "fmt"
    "time"
)

func main() {

    c1 := make(chan string)
    c2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        c1 <- "one"
    }()
    go func() {
        time.Sleep(2 * time.Second)
        c2 <- "two"
    }()

    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("received", msg1)
        case msg2 := <-c2:
            fmt.Println("received", msg2)
        }
    }
}
68 trên bất kỳ phần còn lại nào trong danh sách
package main

import (
    "fmt"
    "time"
)

func main() {

    c1 := make(chan string)
    c2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        c1 <- "one"
    }()
    go func() {
        time.Sleep(2 * time.Second)
        c2 <- "two"
    }()

    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("received", msg1)
        case msg2 := <-c2:
            fmt.Println("received", msg2)
        }
    }
}
69. Vì vậy, như bạn có thể nói đây là một số lượng lớn bản soạn sẵn cho một vấn đề đơn giản về mặt khái niệm là gì. Như bạn có thể đoán, tôi sẽ không sao chép cái này trên tất cả cơ sở mã của mình. Vì vậy, tôi đã viết một gói Python có tên là
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
60 để thực hiện tất cả những điều này theo một cách khá chung chung. Nó cũng thực hiện một số việc thực tế khác như trả về kết quả theo vị trí trong
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
61 thay vì trong Python
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
58

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
5

Gói này thêm một chức năng đơn giản có tên là

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
63 có thể sử dụng làm trình quản lý bối cảnh. Đối với mỗi chức năng bạn muốn chạy, chỉ cần gọi
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
64 trước khi bạn nhập vòng lặp. Trình quản lý bối cảnh sẽ gọi cho họ khi cần. Bạn có thể sử dụng điều này với bất kỳ thứ gì trả về một coroutine hoặc một đối tượng
package main

import (
    "fmt"
    "time"
)

func main() {

    c1 := make(chan string)
    c2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        c1 <- "one"
    }()
    go func() {
        time.Sleep(2 * time.Second)
        c2 <- "two"
    }()

    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("received", msg1)
        case msg2 := <-c2:
            fmt.Println("received", msg2)
        }
    }
}
61. Sau khi thiết lập hoàn tất, chỉ cần thực hiện
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
66 để nhận kết quả theo vị trí. Điều này trả về hai danh sách,
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
67 và
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
68. Vì Python có ngoại lệ, bạn vẫn có khả năng coroutine đưa ra ngoại lệ. Đây là lý do tại sao danh sách
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
68 cũng được trả về. Cả hai danh sách đều có độ dài bằng với số lần gọi hàm ____610

Vì vậy, trong trường hợp bình thường, bạn sẽ có hai mảng sau từ đoạn mã trên

package main

import (
    "fmt"
    "time"
)

func main() {

    c1 := make(chan string)
    c2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        c1 <- "one"
    }()
    go func() {
        time.Sleep(2 * time.Second)
        c2 <- "two"
    }()

    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("received", msg1)
        case msg2 := <-c2:
            fmt.Println("received", msg2)
        }
    }
}
6

Vì danh sách được trả về, bạn có thể giải nén chúng như thế này. Nó làm cho nó rõ ràng một kết quả cá nhân đến từ đâu

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
6

Tất nhiên, có thể mọi giá trị đơn lẻ trong

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
11 đều có mặt nếu tất cả các coroutine quản lý để hoàn thành. Trong trường hợp các coutoutines đưa ra các ngoại lệ, bạn sẽ có các giá trị ngoại lệ ở các vị trí tương ứng trong danh sách
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
68. Chúng được trả về dưới dạng giá trị bởi vì tôi cho rằng nó dễ làm việc hơn là cố gắng thêm
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
13/
async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
14 vào mã của bạn

Nếu bạn không thể sử dụng phiên bản trình quản lý ngữ cảnh vì lý do nào đó, lớp

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'
15 có thể được khởi tạo. Bạn cần gọi thủ công phương thức
package main

import (
    "fmt"
    "time"
)

func main() {

    c1 := make(chan string)
    c2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        c1 <- "one"
    }()
    go func() {
        time.Sleep(2 * time.Second)
        c2 <- "two"
    }()

    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("received", msg1)
        case msg2 := <-c2:
            fmt.Println("received", msg2)
        }
    }
}
68 trên đối tượng đó để hủy mọi tác vụ đang chờ xử lý khi bạn hoàn thành nó