Hàng đợi Redis NodeJS

Hàng đợi Redis NodeJS
Hàng đợi Redis NodeJS

Alberto Gimeno Theo dõi Kỹ sư hệ sinh thái tại GitHub. Đôi khi tôi viết về JavaScript, Node. js và phát triển giao diện người dùng.

Mở rộng quy mô nút của bạn. ứng dụng js sử dụng hàng đợi phân tán

Ngày 10 tháng 6 năm 2021 9 phút đọc 2735

Hàng đợi Redis NodeJS

Ghi chú của biên tập viên. Bài viết này đã được cập nhật vào tháng 6 năm 2021 để bao gồm thông tin về Bull (trái ngược với kue), một Node. thư viện js triển khai hệ thống xếp hàng trên Redis

Trong một bài viết trước, tôi đã nói về cách chạy các tác vụ/công việc nền trong Node. js (đặc biệt với mô-đun worker_threads). Nhưng điều gì sẽ xảy ra nếu bạn đạt đến giới hạn của máy Nút của bạn. js đang chạy trong? . Chia tỷ lệ theo chiều dọc luôn có giới hạn, vì vậy tại một số điểm, bạn sẽ cần phải chia tỷ lệ theo chiều ngang

Mở rộng quy mô bằng Node. hàng đợi công nhân phân tán js

Vì vậy, làm thế nào để bạn thực hiện điều này? . Ngược lại, nếu ứng dụng của bạn cần thực hiện công việc nhưng không bắt buộc phải thực hiện ngay lập tức, thì bạn có thể phân bổ công việc cho các nút "công nhân" và phân phối công việc đó bằng cách sử dụng hàng đợi

Một số trường hợp sử dụng bao gồm tạo báo cáo hàng ngày, tính toán lại mọi thứ cho người dùng hàng ngày (e. g. , khuyến nghị), xử lý những thứ mà người dùng đã tải lên (e. g. , tệp CSV lớn, nhập dữ liệu khi người dùng di chuyển sang dịch vụ, nhập dữ liệu khi người dùng đăng nhập)

Hàng đợi phân tán giống như kho lưu trữ các bản mô tả công việc chứa đủ thông tin để thực hiện công việc hoặc đủ thông tin để tìm ra tất cả những điều cần thiết để thực hiện công việc. Ví dụ

const jobDataSendWelcomeEmail = {
  userId: 1234,
  userName: 'John Smith',
  email: '[email protected]'
}

Thông thường, ứng dụng chính (hoặc bất kỳ phần nào của hệ thống phức tạp hơn), đưa các công việc vào hàng đợi. Các ứng dụng khác đang chạy trong các máy khác nhau được kết nối với hàng đợi và nhận các công việc đó. Những người tiêu dùng này có thể xử lý công việc với thông tin nhận được hoặc ít nhất họ có thể tìm ra tất cả thông tin họ cần và có được thông tin đó. Kiến trúc đơn giản này có những lợi ích quan trọng

  • Ứng dụng của bạn hiện được chia thành hai phần logic có thể được phân phối trong các máy khác nhau
  • Bạn có thể mở rộng quy mô từ một đến nhiều worker mà không cần chạm vào bất kỳ mã nào và không làm gián đoạn quá trình thực thi ứng dụng chính. Hàng đợi đảm nhận việc gửi công việc cho worker thông qua mạng và trong hầu hết các triển khai, đảm nhiệm việc gửi cùng một công việc một lần cho worker

Ghi chú. Mỗi nhà cung cấp có thuật ngữ riêng cho hàng đợi (chủ đề, kênh), công việc (nhiệm vụ, tin nhắn) và công nhân (người tiêu dùng)

Tại sao bạn nên sử dụng Node. hàng đợi công nhân phân tán js

Bạn có thể nghĩ rằng bạn có thể tự triển khai kiến ​​trúc này với cơ sở dữ liệu hiện có của mình mà không cần tăng thêm độ phức tạp cho hệ thống. Bạn có thể tạo bảng "việc làm" có hai cột, cột khóa chính "id" và cột "dữ liệu" chứa tất cả thông tin công việc. Ứng dụng chính chỉ ghi vào bảng và cứ sau X giây, công nhân sẽ đọc từ đó để xem qua công việc tiếp theo sẽ được thực thi. Để ngăn các công nhân khác đọc công việc, bạn thực hiện thao tác trong một giao dịch đồng thời xóa công việc khỏi bảng

thì đấy. Vấn đề được giải quyết, phải không? . Điều đó không lý tưởng, nhưng có thể ổn trong các trường hợp sử dụng cơ bản. Quan trọng hơn, vấn đề là, điều gì sẽ xảy ra nếu công nhân gặp sự cố trong khi xử lý công việc?

Lợi ích của việc sử dụng dịch vụ xếp hàng

Một điều tuyệt vời về hệ thống xếp hàng là cách chúng xử lý các tình huống lỗi. Khi bạn nhận được một công việc, công việc này không bị xóa khỏi hàng đợi, nhưng nó bị "khóa" hoặc ẩn đối với những công nhân còn lại cho đến khi một trong những điều này xảy ra, công nhân xóa công việc đó sau khi hoàn thành công việc hoặc hết thời gian chờ . Vì vậy, nếu một công nhân gặp sự cố, thời gian chờ sẽ xảy ra và công việc quay trở lại hàng đợi để các công nhân khác sử dụng. Khi mọi thứ đã ổn, nhân viên chỉ cần xóa công việc sau khi dữ liệu được xử lý

Điều đó thật tuyệt nếu vấn đề nằm ở công nhân (máy bị tắt, hết tài nguyên, v.v. ), nhưng điều gì sẽ xảy ra nếu sự cố nằm ở mã xử lý công việc và mỗi khi hàng đợi gửi nó tới một nhân viên, thì nhân viên đó lại gặp sự cố?

Sau đó, chúng ta đang ở trong một vòng lặp thất bại vô hạn, phải không? . Nếu đạt đến số lần thử lại tối đa thì tùy thuộc vào hàng đợi, bạn có thể định cấu hình những thứ khác nhau. Một điều chỉnh điển hình là chuyển các công việc đó sang “hàng đợi lỗi” để kiểm tra thủ công hoặc để sử dụng cho công nhân chỉ thông báo lỗi

Việc triển khai hàng đợi phân tán không chỉ tuyệt vời để xử lý các lỗi này mà còn sử dụng các cơ chế khác nhau để gửi công việc cho công nhân càng sớm càng tốt. Một số triển khai sử dụng ổ cắm, những triển khai khác sử dụng HTTP long polling và những triển khai khác có thể sử dụng các cơ chế khác. Đây là một chi tiết triển khai, nhưng tôi muốn nhấn mạnh rằng việc triển khai không hề đơn giản, vì vậy tốt hơn bạn nên sử dụng các triển khai hiện có và đã được thử nghiệm trong trận chiến hơn là triển khai của riêng bạn

Nội dung cần đưa vào dữ liệu công việc

Nhiều lúc tôi thấy mình băn khoăn không biết nên đưa gì vào dữ liệu công việc. Câu trả lời tùy thuộc vào trường hợp sử dụng của bạn, nhưng nó luôn tóm tắt theo hai nguyên tắc

Đừng đặt quá nhiều. Lượng dữ liệu bạn có thể đưa vào dữ liệu công việc bị hạn chế. Kiểm tra hệ thống xếp hàng bạn đang sử dụng để biết thêm thông tin. Thông thường, nó đủ lớn để chúng ta không đạt đến giới hạn, nhưng đôi khi chúng ta muốn đặt quá nhiều. Ví dụ: nếu bạn cần xử lý tệp CSV lớn, bạn không thể đặt tệp đó vào hàng đợi. Trước tiên, bạn cần tải nó lên một dịch vụ lưu trữ, sau đó tạo một công việc có URL tới tệp và thông tin bổ sung mà bạn cần, chẳng hạn như người dùng đã tải nó lên, v.v.

Đừng đặt quá ít. Nếu bạn có dữ liệu bất biến (e. g. , một ngày

const bull = require('bull')
const queue = new bull(“queue_name”)
const job = await queue.add({
  foo: 'bar'
});
2) hoặc dữ liệu hiếm khi thay đổi (e. g. , tên người dùng) bạn có thể nhập dữ liệu công việc của mình. Công việc sẽ được xử lý trong vài giây hoặc vài phút, vì vậy thông thường, bạn có thể đặt một số dữ liệu có thể thay đổi, chẳng hạn như tên người dùng, nhưng điều đó không quan trọng nếu nó không được cập nhật vào lần thứ hai. Bạn có thể lưu các truy vấn vào cơ sở dữ liệu hoặc xóa hoàn toàn bất kỳ truy vấn nào. Tuy nhiên, nếu có thông tin ảnh hưởng đến cách dữ liệu được xử lý, bạn nên truy vấn thông tin đó bên trong bộ xử lý lệnh


Hàng đợi Redis NodeJS
Hàng đợi Redis NodeJS

Hơn 200 nghìn nhà phát triển sử dụng LogRocket để tạo ra trải nghiệm kỹ thuật số tốt hơn

Hàng đợi Redis NodeJS
Hàng đợi Redis NodeJS
Tìm hiểu thêm →


Làm cho công việc hàng đợi của bạn nhỏ và nhanh chóng để xử lý

Nếu bạn cần xử lý các tập dữ liệu lớn, hãy chia chúng thành các phần nhỏ hơn. Nếu bạn phải xử lý một tệp CSV lớn, trước tiên, hãy chia nó thành các phần có số lượng hàng nhất định và tạo một công việc cho mỗi phần. Có một vài lợi ích khi làm theo cách này

  • Dữ liệu sẽ được xử lý nhanh hơn vì có thể xử lý song song
  • Bạn sử dụng tốt hơn các nguồn lực của mình. Sẽ tốt hơn nếu có N công nhân làm những công việc nhỏ hơn là có một công nhân làm công việc gia công nặng trong khi những người còn lại nhàn rỗi hoặc không được sử dụng đúng mức
  • Việc thử lại một công việc nhỏ đã thất bại cũng nhanh hơn và hiệu quả hơn so với một công việc lớn đã thất bại

Nếu bạn cần một kết quả tổng hợp từ tất cả các phần nhỏ đó, bạn có thể đặt tất cả các kết quả trung gian vào cơ sở dữ liệu và khi hoàn thành tất cả, bạn có thể kích hoạt một công việc mới trong một hàng đợi khác để tổng hợp kết quả. Đây là một bản đồ / giảm về bản chất. “Map” là bước chia một công việc lớn thành các công việc nhỏ hơn rồi “reduce” là bước tổng hợp kết quả của các công việc nhỏ đó lại

Nếu bạn không thể phân chia dữ liệu trước, bạn vẫn nên xử lý các công việc nhỏ. Ví dụ: nếu bạn cần sử dụng API bên ngoài sử dụng con trỏ để phân trang kết quả, thì việc tính toán trước tất cả các con trỏ là không thực tế. Bạn có thể xử lý một trang kết quả cho mỗi công việc và sau khi công việc được xử lý, bạn đưa con trỏ đến trang tiếp theo và bạn tạo một công việc mới với con trỏ đó, vì vậy công việc tiếp theo sẽ xử lý trang tiếp theo, v.v.

Công việc bị trì hoãn trong hàng đợi công nhân

Một tính năng thú vị khác của hàng đợi phân tán là bạn thường có thể trì hoãn công việc. Thông thường, có giới hạn về điều này nên bạn không thể trì hoãn công việc trong hai năm, nhưng có một số trường hợp sử dụng điều này hữu ích. Một số ví dụ bao gồm

  • Bạn muốn gửi email chào mừng đến người dùng đã đăng ký nhưng không phải ngay lập tức mà chỉ sau đó. Chỉ cần tạo một công việc bị trì hoãn để gửi email
  • Khi xử lý một công việc, bạn đạt đến giới hạn tốc độ từ API. Bạn có thể sẽ được thông báo khi nào giới hạn tốc độ kết thúc để bạn có thể đưa công việc trở lại hàng đợi, nhưng thời gian cụ thể đó bị trì hoãn
  • Nói chung, nếu bạn muốn kích hoạt một thứ gì đó vào một thời điểm cụ thể trong tương lai, chẳng hạn như lên lịch sao lưu, thông báo, nhắc nhở, v.v…

Đặt ưu tiên công việc

Hầu hết các triển khai hàng đợi không đảm bảo thứ tự thực hiện các công việc, vì vậy đừng dựa vào đó. Tuy nhiên, họ thường thực hiện một số cách ưu tiên một số công việc hơn những công việc khác. Điều này phụ thuộc rất nhiều vào việc triển khai, vì vậy hãy xem tài liệu của hệ thống bạn đang sử dụng để xem bạn có thể đạt được điều đó như thế nào nếu cần

Các trường hợp sử dụng hệ thống xếp hàng công nhân phân tán

Hãy xem xét một số ví dụ. Mặc dù tất cả các hệ thống xếp hàng đều có các tính năng tương tự nhưng không có API chung cho chúng, vì vậy chúng ta sẽ xem một vài ví dụ khác nhau

thư viện Bull

Bull là một nút. thư viện js triển khai hệ thống xếp hàng trên Redis. Redis là một cơ sở dữ liệu trong bộ nhớ có thể được duy trì và nhiều lần đã được sử dụng cho những thứ như lưu trữ phiên trong ứng dụng của bạn. Vì lý do này, việc chọn thư viện này có thể là điều dễ hiểu. Bên cạnh đó, ngay cả khi bạn chưa sử dụng Redis, vẫn có một số nhà cung cấp đám mây cho phép bạn khởi động máy chủ Redis được quản lý một cách dễ dàng (e. g. Heroku hoặc AWS). Cuối cùng, một lợi ích khác của việc sử dụng Bull là ngăn xếp của bạn là nguồn mở 100%, do đó bạn không rơi vào tình trạng khóa nhà cung cấp nào

Nếu bạn cần xử lý nhiều công việc mà vẫn muốn giải pháp mã nguồn mở thì tôi sẽ chọn RabbitMQ. Tôi đã không chọn nó cho các ví dụ trong bài viết này vì Redis thường dễ cài đặt hơn và phổ biến hơn. Tuy nhiên, RabbitMQ đã được thiết kế dành riêng cho các trường hợp sử dụng này, do đó, theo thiết kế, nó vượt trội về mặt kỹ thuật

Có hai thành phần chính của lập lịch công việc với Bull. một nhà sản xuất và người tiêu dùng. Nhà sản xuất tạo công việc và thêm chúng vào Hàng đợi Redis, trong khi người tiêu dùng chọn công việc từ hàng đợi và xử lý chúng

Hãy xem cách tạo và sử dụng công việc bằng cách sử dụng Bull

Tạo hàng đợi và đặt một công việc trên đó

const bull = require('bull')
const queue = new bull(“queue_name”)
const job = await queue.add({
  foo: 'bar'
});

Tiêu thụ công việc từ hàng đợi

const bull = require('bull');
const queue = new bull('my-queue')
queue.process(async (job, done) => { 
/*  /* 
    Sometimes, you might need to report the jobs progress, you can easily use the     job.progress() function to track the progress 
     */
  let progress = 0;

  for(i = 0; i < 80; i++){
    await doSomething(job.data);
    progress += 10;
    job.progress(progress);
  }
  // call done when finished
  done();
});

Microsoft Azure sử dụng Service Bus của nó

Microsoft Azure cung cấp hai dịch vụ xếp hàng. Có một so sánh tuyệt vời ở đây. Tôi đã chọn sử dụng Service Bus vì nó đảm bảo rằng một công việc được giao tối đa cho một công nhân

Hãy xem cách tạo và sử dụng công việc bằng Service Bus

Tạo hàng đợi và đặt một công việc trên đó

Với Microsoft Azure, chúng ta có thể tạo hàng đợi theo chương trình với phương thức

const bull = require('bull')
const queue = new bull(“queue_name”)
const job = await queue.add({
  foo: 'bar'
});
0. Khi nó được tạo, chúng ta có thể bắt đầu gửi tin nhắn

const azure = require('azure')
const serviceBusService = azure.createServiceBusService()
const jobData = { hello: 'world' }
serviceBusService.createTopicIfNotExists('queue_name', err => {
  if (err) console.log(err)
  serviceBusService.sendTopicMessage('queue_name', jobData, err => {
    if (err) console.log(err)
  })
})

Tiêu thụ công việc từ hàng đợi công nhân

Một số triển khai, như triển khai này, được yêu cầu để tạo đăng ký. Xem tài liệu Azure để biết thêm thông tin về chủ đề này

const azure = require('azure')
const serviceBusService = azure.createServiceBusService()
serviceBusService.createSubscription('queue_name', 'subscription_name', err => {
  if (err) return console.log(err)
  // If the `isPeekLock` option is not set to true, the message will be deleted when peeked
  serviceBusService.receiveSubscriptionMessage('queue_name', 'subscription_name', { isPeekLock: true }, (err, message) => {
    if (err) return console.log(err)
    // Do something with `message` and then delete it
    serviceBusService.deleteMessage(lockedMessage, err => {
      if (err) return console.log(err)
    })
  })
})

Amazon, sử dụng dịch vụ SQS của mình

Dịch vụ hàng đợi phân tán của Amazon được gọi là Dịch vụ hàng đợi đơn giản (SQS). Nó có thể được sử dụng trực tiếp nhưng cũng có thể định cấu hình nó với các dịch vụ AWS khác để thực hiện các quy trình công việc thú vị. Ví dụ: bạn có thể định cấu hình bộ chứa S3 để tự động gửi công việc đến hàng đợi SQS khi một tệp (đối tượng) mới được lưu trữ. Ví dụ: điều này có thể hữu ích để xử lý tệp dễ dàng (video, hình ảnh, CSV)


Các bài viết hay khác từ LogRocket

  • Đừng bỏ lỡ một khoảnh khắc nào với The Replay, một bản tin được tuyển chọn từ LogRocket
  • Tìm hiểu cách Galileo của LogRocket loại bỏ tiếng ồn để chủ động giải quyết các vấn đề trong ứng dụng của bạn
  • Sử dụng useEffect của React để tối ưu hóa hiệu suất ứng dụng của bạn
  • Chuyển đổi giữa nhiều phiên bản của Node
  • Khám phá cách tạo hoạt ảnh cho ứng dụng React của bạn với AnimXYZ
  • Khám phá Tauri, một khuôn khổ mới để xây dựng các tệp nhị phân
  • So sánh NestJS với. Thể hiện. js

Hãy xem cách chúng ta có thể lập trình thêm và sử dụng các công việc trên hàng đợi

Tạo hàng đợi và đặt một công việc trên đó

const AWS = require('aws-sdk')
AWS.config.update({region: 'REGION'})
const sqs = new AWS.SQS({ apiVersion: '2012-11-05' })
const queueParams = {
  QueueName: 'queue_name',
  Attributes: {
    'DelaySeconds': '60',
    'MessageRetentionPeriod': '86400'
  }
}
sqs.createQueue(queueParams, (err, data) => {
  if (err) return console.log(err)
  console.log('queue url', data.QueueUrl)
  const jobParams = {
    MessageBody: JSON.stringify({ hello: 'world' }),
    QueueUrl: data.QueueUrl
  }
  sqs.sendMessage(jobParams, (err, data) => {
    if (err) return console.log(err)
  })
})

Tiêu thụ công việc từ hàng đợi

const AWS = require('aws-sdk')
AWS.config.update({region: 'REGION'})
const sqs = new AWS.SQS({apiVersion: '2012-11-05'})
const queueURL = 'SQS_QUEUE_URL' // Obtained when the queue was created
const params = {
 AttributeNames: [
  'SentTimestamp'
 ],
 MaxNumberOfMessages: 1, // receive only one message at a time
 MessageAttributeNames: [
    "All"
 ],
 QueueUrl: queueURL,
 VisibilityTimeout: 20,
 WaitTimeSeconds: 0
}
sqs.receiveMessage(params, (err, data) => {
  if (err) return console.log(err)
  if (data.Messages) {
    // Do something when the messages and then delete them
    const message = data.Messages[0]
    
    const deleteParams = {
      QueueUrl: queueURL,
      ReceiptHandle: message.ReceiptHandle
    }
    sqs.deleteMessage(deleteParams, function(err, data) {
      if (err) return console.log(err)
    })
  }
})

Kiểm tra nút. js trên SQS để biết thêm thông tin

Google Cloud, sử dụng dịch vụ pub/phụ của Google Cloud

Google Cloud, giống như Azure, cũng yêu cầu tạo đăng ký (xem tài liệu để biết thêm thông tin). Trên thực tế, bạn cần tạo đăng ký trước, trước khi gửi tin nhắn đến chủ đề/hàng đợi nếu không chúng sẽ không khả dụng

Tài liệu gợi ý tạo cả chủ đề và đăng ký từ dòng lệnh

const bull = require('bull')
const queue = new bull(“queue_name”)
const job = await queue.add({
  foo: 'bar'
});
1

const bull = require('bull')
const queue = new bull(“queue_name”)
const job = await queue.add({
  foo: 'bar'
});
2

Tuy nhiên, bạn cũng có thể tạo chúng theo cách lập trình, nhưng bây giờ hãy xem cách chèn và sử dụng các công việc giả định rằng chúng ta đã tạo hàng đợi (chủ đề) và đăng ký

Tạo hàng đợi và đặt một công việc trên đó

const bull = require('bull')
const queue = new bull(“queue_name”)
const job = await queue.add({
  foo: 'bar'
});
0

Sử dụng công việc từ hàng đợi

Google Cloud Pub/Sub đảm bảo rằng một tin nhắn/công việc được gửi ít nhất một lần cho mỗi đăng ký, nhưng tin nhắn có thể được gửi nhiều lần (như thường lệ, hãy kiểm tra tài liệu để biết thêm thông tin)

const bull = require('bull')
const queue = new bull(“queue_name”)
const job = await queue.add({
  foo: 'bar'
});
1

Sự kết luận

Hàng đợi phân tán là một cách tuyệt vời để mở rộng quy mô ứng dụng của bạn vì một vài lý do

Chỉ dành cho 200 Theo dõi các yêu cầu mạng chậm và không thành công trong sản xuất

Triển khai trang web hoặc ứng dụng web dựa trên Node là phần dễ dàng. Đảm bảo phiên bản Node của bạn tiếp tục cung cấp tài nguyên cho ứng dụng của bạn là lúc mọi thứ trở nên khó khăn hơn. Nếu bạn quan tâm đến việc đảm bảo các yêu cầu đối với dịch vụ phụ trợ hoặc bên thứ ba thành công, hãy thử LogRocket.
Hàng đợi Redis NodeJS
Hàng đợi Redis NodeJS
https. // tên lửa. com/đăng ký/

LogRocket giống như một DVR dành cho ứng dụng web và thiết bị di động, ghi lại mọi thứ diễn ra trong khi người dùng tương tác với ứng dụng của bạn theo đúng nghĩa đen. Thay vì đoán xem tại sao lại xảy ra sự cố, bạn có thể tổng hợp và báo cáo về các yêu cầu mạng có vấn đề để nhanh chóng hiểu nguyên nhân gốc rễ

Hàng đợi Redis là gì?

Redis Queue là thư viện python dành cho các công việc xếp hàng để xử lý nền . Vì nhiều dịch vụ lưu trữ sẽ hết thời gian chờ đối với các yêu cầu HTTP dài, nên tốt nhất là thiết kế các API để đóng các yêu cầu nhanh nhất có thể. Redis Queue cho phép chúng tôi thực hiện việc này bằng cách đẩy các tác vụ vào hàng đợi và sau đó đến một công nhân để xử lý.

BullMQ là gì?

BullMQ là một Nút. Thư viện js triển khai hệ thống hàng đợi nhanh và mạnh mẽ được xây dựng trên Redis giúp giải quyết nhiều kiến ​​trúc vi dịch vụ thời hiện đại .

Hàng đợi trong nút JS là gì?

Hàng đợi là cấu trúc dữ liệu tuyến tính được sử dụng trong Nút. js để tổ chức dần dần và thích hợp các hoạt động không đồng bộ . Nó là một danh sách có thứ tự các hoạt động không đồng bộ trong đó một hoạt động không đồng bộ được chèn vào cuối hàng đợi và được xóa khỏi đầu hàng đợi.

Hàng đợi không đồng bộ là gì?

Hàng đợi không đồng bộ giống như hàng đợi tiêu chuẩn, ngoại trừ khi bạn loại bỏ một phần tử khỏi hàng đợi trống, quá trình tính toán sẽ chặn thay vì thất bại .