Hướng dẫn php worker

Các PHP Developers hiếm khi sử dụng đa luồng. Sự đơn giản của chương trình đồng bộ, đơn luồng chắc chắn rất hấp dẫn, nhưng đôi khi việc sử dụng đa luồng có thể mang lại một số cải tiến hiệu suất đáng kể.

Nội dung chính

  • Khi nào không sử dụng pthreads
  • Xử lý các tasks một lần
  • Recycling threads
  • pthreads và tính bất biến
  • Đồng bộ hoá (Synchronization)
  • Kết luận
  • Tham khảo

Nội dung chính

  • Khi nào không sử dụng pthreads
  • Xử lý các tasks một lần
  • Recycling threads
  • pthreads và tính bất biến
  • Đồng bộ hoá (Synchronization)
  • Kết luận
  • Tham khảo

Trong bài viết này, tôi sẽ giới thiệu cho các bạn cách tạo luồng trong PHP với pthreads extension. Để làm được điều này cần cài đặt PHP 7x ZTS (Zend Thread Safety), cùng với pthreads v3. (Tại thời điểm viết, người sử dụng PHP 7.1 sẽ cần phải cài đặt từ nhánh master của repo pthreads - xem phần này của bài viết để biết chi tiết về việc build các third-party extensions từ source).

PS: pthreads v2 nhắm vào PHP 5.x và không còn được hỗ trợ; pthreads v3 nhắm vào PHP 7.x và đang được phát triển tích cực.

Khi nào không sử dụng pthreads

Trước khi chúng ta tiếp tục, trước tiên tôi muốn làm rõ khi bạn không nên (cũng như không thể) sử dụng pthreads extension.

Trong pthreads v2, khuyến nghị là không nên sử dụng pthreads trong web server environment (tức là trong FCGI process). Theo pthreads v3, đề xuất này đã được thi hành, vì vậy bây giờ bạn chỉ đơn giản là không thể sử dụng nó trong web server environment. Hai lý do nổi bật cho điều này là:

  1. Nó không an toàn để sử dụng multiple threads trong environment như vậy (gây ra vấn đề về IO, và có thể dẫn đến các vấn đề khác).
  2. Nó không mở rộng tốt. Ví dụ: giả sử bạn có một PHP script tạo ra một thread mới để xử lý một số công việc và script đó được execute theo từng request. Điều này có nghĩa là đối với mỗi request, ứng dụng của bạn sẽ tạo ra một thread mới (đây là mô hình luồng 1: 1 - một chủ đề cho một yêu cầu). Nếu ứng dụng của bạn đang phục vụ 1.000 request / giây, thì nó sẽ tạo ra 1.000 threads / giây! Việc có nhiều threads này chạy trên một máy sẽ nhanh chóng làm lụt, và vấn đề sẽ chỉ trở nên trầm trọng hơn khi tỷ lệ request tăng lên.

Đó là lý do tại sao threads không phải là một giải pháp tốt trong environment như vậy. Nếu bạn đang tìm kiếm threads như là một giải pháp cho các tác vụ chặn IO (chẳng hạn như thực hiện các yêu cầu HTTP), hãy để tôi chỉ cho bạn theo hướng lập trình không đồng bộ (asynchronous programming), có thể đạt được qua các framework như Amp. SitePoint đã phát hành một số bài báo xuất sắc về chủ đề này (như Writing Async Libraries và Modding Minecraft in PHP), trong trường hợp bạn quan tâm.

Xử lý các tasks một lần

Đôi khi, bạn muốn xử lý các tasks một lần theo cách đa luồng (như thực hiện một số IO-bound task). Trong trường hợp như vậy, Thread class có thể được sử dụng để tạo một thread mới và chạy một số work unit trong thread riêng biệt đó.

Ví dụ:

$task = new class extends Thread {
    private $response;

    public function run()
    {
        $content = file_get_contents("http://google.com");
        preg_match("~(.+)~", $content, $matches);
        $this->response = $matches[1];
    }
};

$task->start() && $task->join();

var_dump($task->response); // string(6) "Google"

Ở trên, run method là work unit của chúng ta sẽ được thực hiện bên trong của thread mới. Khi gọi Thread :: start, thread mới được sinh ra và run method được gọi. Sau đó chúng ta join các thread sinh ra trở lại thread chính (thông qua Thread::join), sẽ block cho đến khi thread riêng biệt đã thực hiện hoàn thành. Điều này đảm bảo rằng nhiệm vụ đã hoàn tất được thực hiện trước khi chúng ta cố gắng đưa ra kết quả (lưu trữ trong $task->response).

Chúng ta có thể tách các class bằng cách extend Threaded class, chúng có thể chạy được bên trong các threads khác:

class Task extends Threaded
{
    public $response;

    public function someWork()
    {
        $content = file_get_contents('http://google.com');
        preg_match('~(.+)~', $content, $matches);
        $this->response = $matches[1];
    }
}

$task = new Task;

$thread = new class($task) extends Thread {
    private $task;

    public function __construct(Threaded $task)
    {
        $this->task = $task;
    }

    public function run()
    {
        $this->task->someWork();
    }
};

$thread->start() && $thread->join();

var_dump($task->response);

Bất kỳ class nào cần chạy được bên trong của một thread riêng biệt phải extends Threaded class bằng một cách nào đó. Điều này là vì nó cung cấp các khả năng cần thiết để chạy bên trong các threads khác nhau, cũng như cung cấp các interfaces hữu ích và an toàn tiềm ẩn (đối với những thứ như đồng bộ hoá tài nguyên).

Chúng ta hãy nhìn vào kiến trúc các classes của pthreads:

Threaded (implements Traversable, Collectable)
    Thread
        Worker
    Volatile
Pool

Chúng ta đã nhìn thấy và tìm hiểu những điều căn bản về các class Thread và Threaded, vậy bây giờ hãy xem ba phần còn lại (Worker, Volatile và Pool).

Recycling threads

Tách một thread mới cho mỗi tác vụ chạy song song là khó khăn. Điều này là do một shared-nothing architecture phải được sử dụng bởi pthreads để tạo được threading bên trong PHP. Điều này có nghĩa là toàn bộ execution context của PHP interpreter (bao gồm mọi class, interface, trait và function) phải được sao chép cho mỗi thread được tạo. Vì điều này làm giảm performance đáng kể, vì vậy nên luôn luôn sử dụng lại một thread khi có thể. Các thread có thể được sử dụng lại theo hai cách: với Workers hoặc với Pools.

Worker class được sử dụng để thực hiện một loạt các nhiệm vụ đồng bộ bên trong một thread khác. Điều này được thực hiện bằng cách tạo ra một Worker mới (tạo một thread mới), và sau đó xếp các công việc vào thread riêng biệt đó (thông qua Worker::stack).

Nhìn nhanh vào ví dụ sau:

class Task extends Threaded
{
    private $value;

    public function __construct(int $i)
    {
        $this->value = $i;
    }

    public function run()
    {
        usleep(250000);
        echo "Task: {$this->value}\n";
    }
}

$worker = new Worker();
$worker->start();

for ($i = 0; $i < 15; ++$i) {
    $worker->stack(new Task($i));
}

while ($worker->collect());

$worker->shutdown();

Output:

Hướng dẫn php worker

Đoạn code trên stack 15 works vào đối tượng $worker mới thông qua Worker::stack, và sau đó xử lý chúng theo thứ tự xếp chồng lên nhau. Worker::collect method, như đã nói ở trên, được sử dụng để làm sạch các tasks một khi chúng đã hoàn tất việc thực thi. Bằng cách sử dụng nó trong vòng lặp while, chúng ta sẽ khóa thread chính cho đến khi tất cả các công việc được xếp chồng lên nhau đã được thực hiện và đã được dọn dẹp trước khi chúng ta kích hoạt Worker::shutdown. Tắt worker sớm (tức là trong khi vẫn còn các tasks được thực hiện) sẽ vẫn block được main thread cho đến khi tất cả các tasks đã thực hiện hoàn thành - đơn giản là tasks sẽ không còn rải rác (gây rò rỉ bộ nhớ).

Worker class cung cấp một số methods khác liên quan đến stack task của nó, bao gồm Worker::unstack để loại bỏ mục stacked cũ nhất, và Worker::getStacked trả về số lượng các task trong execution stack. Stack của worker chỉ giữ các tasks sẽ được thực hiện. Khi một task trong stack đã được thực hiện, nó sẽ được gỡ bỏ và sau đó đặt vào stack riêng (bên trong) để được thu gom rác (sử dụng Worker::collect).

Một cách khác để sử dụng lại một thread khi thực hiện nhiều nhiệm vụ là sử dụng một thread pool (thông Pool class). Các Thread pools được cung cấp bởi một nhóm Workers cho phép các tasks được thực hiện đồng thời, nơi mà các yếu tố đồng thời (số threads mà pool chạy) được xác định khi tạo pool.

Chúng ta hãy xem ví dụ trên được thay thế bằng cách sử dụng pool của workers:

class Task extends Threaded
{
    private $value;

    public function __construct(int $i)
    {
        $this->value = $i;
    }

    public function run()
    {
        usleep(250000);
        echo "Task: {$this->value}\n";
    }
}

$pool = new Pool(4);

for ($i = 0; $i < 15; ++$i) {
    $pool->submit(new Task($i));
}

while ($pool->collect());

$pool->shutdown();

Output:

Có một vài khác biệt đáng chú ý giữa việc sử dụng một pool như trái ngược với một worker. Thứ nhất, pool không cần phải manually started, chúng bắt đầu thực hiện task ngay khi có thể. Thứ hai, chúng ta submit các tasks cho các pool, chứ không phải stack chúng. Ngoài ra, Pool class không extends Threaded, và vì vậy nó có thể không được truyền cho các thread khác (không giống như Worker).

Theo thực tế, workers và pools nên luôn luôn collect tasks sau khi hoàn thành, và manually shut down. Các thread tạo ra qua Thread class cũng nên được kết hợp lại creator thread.

pthreads và tính bất biến

Class cuối cùng chúng ta tìm hiểu là Volatile - một bổ sung mới cho pthreads v3. Tính không thay đổi đã trở thành một khái niệm quan trọng trong pthreads, vì nếu không có nó, hiệu suất bị suy giảm nghiêm trọng. Do đó, theo mặc định, các properties các Threaded classes chính là là các đối tượng Threaded và ngay bây giờ không thể thay đổi, và do đó chúng không thể được assign lại sau initial assignment. Khả năng biến đổi rõ ràng đối với các properties như vậy bây giờ đã được ưa chuộng, và vẫn có thể được thực hiện bằng cách sử dụng Volatile class mới.

Chúng ta hãy xem xét một ví dụ để chứng minh các ràng buộc bất biến mới:

class Task extends Threaded // a Threaded class
{
    public function __construct()
    {
        $this->data = new Threaded();
        // $this->data is not overwritable, since it is a Threaded property of a Threaded class
    }
}

$task = new class(new Task()) extends Thread { // a Threaded class, since Thread extends Threaded
    public function __construct($tm)
    {
        $this->threadedMember = $tm;
        var_dump($this->threadedMember->data); // object(Threaded)#3 (0) {}
        $this->threadedMember = new StdClass(); // invalid, since the property is a Threaded member of a Threaded class
    }
};

Các Threaded properties của các Volatile classes, mặt khác, có thể thay đổi:

class Task extends Volatile
{
    public function __construct()
    {
        $this->data = new Threaded();
        $this->data = new StdClass(); // valid, since we are in a volatile class
    }
}

$task = new class(new Task()) extends Thread {
    public function __construct($vm)
    {
        $this->volatileMember = $vm;

        var_dump($this->volatileMember->data); // object(stdClass)#4 (0) {}

        // still invalid, since Volatile extends Threaded, so the property is still a Threaded member of a Threaded class
        $this->volatileMember = new StdClass();
    }
};

Chúng ta có thể thấy rằng Volatile class ghi đè lên tính không thay đổi được thực thi bởi cha của nó Threaded class cho phép các Threaded properties được sắp xếp lại (cũng như unset() được).

Chỉ có một chủ đề cơ bản cuối cùng để đề cập đến sự thay đổi và các Volatile class array. Array trong pthreads được tự động chuyển sang các objects để dễ dàng khi được gán cho property của Threaded class. Điều này là bởi vì nó chỉ đơn giản là không an toàn để thao tác một array từ nhiều contexts trong PHP.

Hãy xem lại một ví dụ để hiểu rõ hơn về mọi thứ:

$array = [1,2,3];

$task = new class($array) extends Thread {
    private $data;

    public function __construct(array $array)
    {
        $this->data = $array;
    }

    public function run()
    {
        $this->data[3] = 4;
        $this->data[] = 5;

        print_r($this->data);
    }
};

$task->start() && $task->join();

/* Output:
Volatile Object
(
    [0] => 1
    [1] => 2
    [2] => 3
    [3] => 4
    [4] => 5
)
*/

Chúng ta có thể thấy rằng các Volatite objects có thể được coi như là array, vì chúng cung cấp hỗ trợ cho các thao tác dựa trên array (như được trình bày ở trên) với toán tử bộ phân số ([]). Tuy nhiên, các Volatite classes không được hỗ trợ bởi các array functions phổ biến như array_pop và array_shift. Thay vào đó, Threaded class cung cấp cho chúng ta các methods tương tự như các build-in functions.

Demo:

$data = new class extends Volatile {
    public $a = 1;
    public $b = 2;
    public $c = 3;
};

var_dump($data);
var_dump($data->pop());
var_dump($data->shift());
var_dump($data);

/* Output:
object([email protected])#1 (3) {
  ["a"]=> int(1)
  ["b"]=> int(2)
  ["c"]=> int(3)
}
int(3)
int(1)
object([email protected])#1 (1) {
  ["b"]=> int(2)
}
*/

Ngoài ra còn có các method khác như Threaded::chunk()Threaded::merge().

Đồng bộ hoá (Synchronization)

Chủ đề cuối cùng chúng tôi sẽ giới thiệu trong bài viết này là đồng bộ hóa trong pthreads. Đồng bộ hóa là một kỹ thuật cho phép kiểm soát truy cập vào các tài nguyên chia sẻ.

Ví dụ, chúng ta hãy thực hiện counter đơn giản:

$counter = new class extends Thread {
    public $i = 0;

    public function run()
    {
        for ($i = 0; $i < 10; ++$i) {
            ++$this->i;
        }
    }
};

$counter->start();

for ($i = 0; $i < 10; ++$i) {
    ++$counter->i;
}

$counter->join();

var_dump($counter->i); // outputs a number from between 10 and 20

Không sử dụng đồng bộ, đầu ra sẽ không được xác định. Multiple threads ghi vào một biến duy nhất mà không kiểm soát quyền truy cập sẽ khiến các bản cập nhật bị mất.

Hãy khắc phục điều này bằng cách thêm đồng bộ hóa để chúng ta nhận được đầu ra chính xác là 20:

$counter = new class extends Thread {
    public $i = 0;

    public function run()
    {
        $this->synchronized(function () {
            for ($i = 0; $i < 10; ++$i) {
                ++$this->i;
            }
        });
    }
};

$counter->start();

$counter->synchronized(function ($counter) {
    for ($i = 0; $i < 10; ++$i) {
        ++$counter->i;
    }
}, $counter);

$counter->join();

var_dump($counter->i); // int(20)

Đồng bộ các khối mã cũng có thể hợp tác với nhau bằng cách sử dụng Threaded::waitThreaded::notify (cùng với Threaded::notifyAll).

Dưới đây là một gia tăng đáng kinh ngạc từ 2 sựđồng bộ hóa trong vòng lặp:

$counter = new class extends Thread {
    public $cond = 1;

    public function run()
    {
        $this->synchronized(function () {
            for ($i = 0; $i < 10; ++$i) {
                var_dump($i);
                $this->notify();

                if ($this->cond === 1) {
                    $this->cond = 2;
                    $this->wait();
                }
            }
        });
    }
};

$counter->start();

$counter->synchronized(function ($counter) {
    if ($counter->cond !== 2) {
        $counter->wait(); // wait for the other to start first
    }

    for ($i = 10; $i < 20; ++$i) {
        var_dump($i);
        $counter->notify();

        if ($counter->cond === 2) {
            $counter->cond = 1;
            $counter->wait();
        }
    }
}, $counter);

$counter->join();

/* Output:
int(0)
int(10)
int(1)
int(11)
int(2)
int(12)
int(3)
int(13)
int(4)
int(14)
int(5)
int(15)
int(6)
int(16)
int(7)
int(17)
int(8)
int(18)
int(9)
int(19)
*/

Bạn có thể đã nhận thấy các điều kiện bổ sung đã được đặt xung quanh Threaded::wait. Những điều kiện này rất quan trọng bởi vì chúng chỉ cho phép gọi lại đồng bộ để tiếp tục khi nó đã nhận được thông báo và điều kiện cụ thể là true. Điều này rất quan trọng bởi vì các thông báo có thể đến từ những nơi khác ngoài các cuộc gọi đến Threaded::notify. Vì vậy, nếu các cuộc gọi đến Threaded::wait không được bao gồm trong điều kiện, chúng tôi sẽ mở cửa cho các cuộc gọi wakeup giả mạo, mà sẽ dẫn đến code không thể đoán trước.

Kết luận

Chúng ta đã thấy 5 class của pthreads (Threaded, Thread, Worker, Volatile, và Pool), bao gồm cả các ví dụ khi mỗi class được sử dụng. Chúng ta cũng đã xem xét khái niệm không thay đổi mới trong pthreads, cũng như có một dạo qua một cách nhanh chóng các tính năng đồng bộ hóa mà nó hỗ trợ. Với những nguyên tắc cơ bản này đã được đề cập đến, bây giờ chúng ta có thể bắt đầu xem xét áp dụng các pthreads cho một số trường hợp sử dụng thực tế!

Tham khảo

  • Parallel Programming with Pthreads in PHP – the Fundamentals
  • Pthreads Extension