Luồng công việc Python NGAY HÔM NAY

Một quy trình công việc được biểu thị dưới dạng NGÀY. NGÀY chứa các phần công việc riêng lẻ, được gọi là nhiệm vụ và cũng phải làm gì nếu nhiệm vụ không thành công

Các nhiệm vụ mô tả làm thế nào để

  • Thực hiện tìm nạp dữ liệu
  • Chạy phân tích
  • Kích hoạt các hệ thống khác
  • Các phần việc khác

Nhiều quy trình sử dụng nhiều dữ liệu cần thực thi các tác vụ theo thứ tự. Các tác vụ riêng lẻ có thể được chạy theo trình tự hoặc song song. Tuy nhiên, việc phối hợp các tác vụ cũng rất quan trọng để đạt được trạng thái được xử lý đồng bộ. Việc phối hợp được thực hiện thông qua việc triển khai định nghĩa quy trình làm việc dưới dạng DAG. Các DAG được kết nối và triển khai trên nền tảng quản lý Apache Airflow .

Triển khai DAG sử dụng nhiều điểm cuối Dữ liệu mở cho các ngành để xác thực, phân tích cú pháp và lưu trữ bản ghi từ các tệp dữ liệu hoặc định dạng đầu vào vào các kho dữ liệu và siêu dữ liệu khác nhau trên lớp lưu trữ Dữ liệu Mở cho các ngành .

Các định nghĩa DAG sử dụng các toán tử được tạo sẵn và tùy chỉnh

Việc phối hợp các nhiệm vụ sử dụng ba loại nhiệm vụ phổ biến. Người vận hành Các tác vụ được xác định trước mà bạn có thể xâu chuỗi lại với nhau để xây dựng hầu hết các phần trong DAG của mình. Cảm biến Một lớp con đặc biệt của các toán tử đang chờ một sự kiện bên ngoài xảy ra. Luồng tác vụ Một chức năng Python tùy chỉnh được đóng gói thành một tác vụ

Các loại nhiệm vụ phổ biến này là các lớp con của Người vận hành Cơ sở của Luồng khí. Theo nghĩa đó, các khái niệm về tác vụ và toán tử có thể hoán đổi cho nhau

Toán tử và cảm biến đều là mẫu và khi bạn gọi một trong số chúng trong tệp DAG, bạn đang thực hiện một tác vụ

Các toán tử được sử dụng trong Dữ liệu Mở cho các ngành định nghĩa của DAG

Hai loại toán tử được sử dụng trong Dữ liệu Mở cho các ngành định nghĩa của DAG. Toán tử Python

Toán tử dựa trên Python. Các định nghĩa DAG chứa các hướng dẫn về cách sắp xếp thứ tự và trao đổi thông báo giữa các tác vụ khác nhau

Tiện ích Open Data for Industries Apache Airflow cài đặt SDK Python chung cho tất cả các nhà khai thác Python , được sử dụng trong Dữ liệu mở cho các ngành DAG. Để biết thêm thông tin, hãy xem kho lưu trữ Python SDK OSDU.

Ghi chú. Việc nhập các tạo phẩm của Tệp kê khai được thực hiện thông qua các toán tử Python. Để biết thêm thông tin, hãy xem Các bước mẫu để nhập tệp kê khai.

KubernetesPodOperator Toán tử này sử dụng đối tượng Kubernetes Pod để tách logic quy trình công việc khỏi bối cảnh Luồng không khí. Nó giúp tạo ra một bối cảnh mới có thể chạy bất kỳ logic nào được viết bằng các ngôn ngữ lập trình khác nhau. Bằng cách sử dụng KubernetesPodOperator, bạn đảm bảo rằng mọi logic nghiệp vụ đều có thể được sử dụng lại từ các biểu mẫu và ngữ cảnh khác nhau

Sơ đồ sau đây cho thấy sự tương tác giữa bối cảnh Luồng khí Apache và phiên bản Kubernetes Pod.

Một số lợi ích khi sử dụng KubernetesPodOperator. Tăng tính linh hoạt cho việc triển khai. Bạn hoàn toàn không cần viết bất kỳ logic mới nào hoặc bạn cần triển khai mã Apache Airflow ít hơn đáng kể. Hơn nữa, trong các đối tượng Kubernetes Pod, logic được chứa hoàn toàn. Tính linh hoạt của cấu hình và phụ thuộc. Việc sử dụng hình ảnh Docker tùy chỉnh đảm bảo rằng môi trường, cấu hình và phụ thuộc của tác vụ là độc lập. Sử dụng bí mật Kubernetes để tăng cường bảo mật. Với KubernetesPodOperator, bạn có thể sử dụng công nghệ Kubernetes Vault để lưu trữ tất cả dữ liệu nhạy cảm.

Các toán tử và cấu hình của DAG

IBM Dữ liệu Mở cho các ngành chạy các quy trình được sắp xếp theo định nghĩa DAG. Mỗi DAG được triển khai trong Dữ liệu mở cho các ngành đều sử dụng Python hoặc KubernetesPodOperator.

DAGs Toán tử được sử dụng Định dạng đầu vào được hỗ trợ Cấu hình cần thiết Mô tảOsdu_ingest

[Nhập phần mềm kê khai rõ ràng nhất]

Python OperatorJSONBạn cần định cấu hình các biến hệ điều hành hoặc biến môi trường Luồng khí Apache . DAG kê khai sử dụng các toán tử Python tùy chỉnh để sắp xếp quy trình làm việc. Người vận hành đề cập đến các biến ở cấp hệ điều hành hoặc chúng đề cập đến các biến từ bối cảnh Luồng không khí. csv-parser-dag

[nhập tệp CSV]

KubernetesPodOperatorCSVBạn cần định cấu hình các biến môi trường Luồng khí Apache . CSV DAG sử dụng đối tượng Kubernetes Pod để chạy logic nhập. Logic nhập được chứa trong một hình ảnh và nó được định cấu hình bằng định nghĩa DAG. sgy-to-zgy

[Chuyển đổi tệp SGY sang ZGY]

KubernetesPodOperatorSEGYBạn cần định cấu hình các biến môi trường Luồng khí Apache . DAG chuyển đổi tệp SGY sang ZGY sử dụng đối tượng Kubernetes Pod để chạy logic chuyển đổi. Logic chuyển đổi được chứa dưới dạng hình ảnh và được định cấu hình với định nghĩa DAG. openvds_import

[Chuyển đổi tệp SGY sang Open VDS]

KubernetesPodOperatorSEGYBạn cần định cấu hình các biến môi trường Luồng khí Apache . DAG chuyển đổi tệp SGY sang Open VDS sử dụng đối tượng Kubernetes Pod để chạy logic chuyển đổi. Logic chuyển đổi được chứa dưới dạng hình ảnh và được định cấu hình với định nghĩa DAG. Energistics_xml_ingest [nhập tệp WITSML]Python Operator và KubernetesPodOperatorXMLBạn cần định cấu hình các biến cấp Hệ điều hành. DAG sử dụng Toán tử Python tùy chỉnh và KubernetesPodOperator để sắp xếp quy trình làm việc. Các toán tử đề cập đến các biến ở cấp Hệ điều hành.

Triển khai và cấu hình của DAG

Apache Airflow trên Dữ liệu mở cho các ngành là việc cung cấp khung được chứa trong vùng chứa, đạt được thông qua các đối tượng Kubernetes khác nhau. Luồng khí Apache cung cấp các định nghĩa DAG từ một thư mục được định cấu hình. Mọi thay đổi đối với thư mục này đều được trình lập lịch trình đọc để cập nhật các định nghĩa DAG trong ngữ cảnh đang chạy. Cùng một thư mục được tham chiếu bởi những người thi hành khi các tác vụ được lên lịch trên chúng.

Định cấu hình và bật DAG

Với các bước trước đó, bạn đảm bảo rằng các DAG được lập lịch bởi bộ lập lịch và có sẵn trên máy chủ web Luồng không khí Apache .

Trước khi chúng được kích hoạt cho các quy trình chức năng, bạn cần định cấu hình và bật DAG

  1. Đăng nhập vào máy chủ web Airflow
    1. Đăng nhập vào bảng điều khiển quản trị OpenShift .
    2. Chọn dự án nơi Apache Airflow được cài đặt. Cái mặc định là osdu-airflow. osdu-airflow.
    3. Chọn tùy chọn Mạng rồi Tuyến đường .
    4. Tìm bảng điều khiển web của Luồng không khí Apache . Nhấp vào URL trong cột Vị trí cho hàng Tên .
    5. Nhận thông tin đăng nhập cho Apache Airflow bảng điều khiển web.
      1. Đăng nhập vào OpenShift bảng điều khiển dành cho quản trị viên.
      2. Chọn dự án osdu .
      3. Chọn tùy chọn menu Khối lượng công việc rồi Bí mật .
      4. Nhấp vào hàng có tên bí mật đạo cụ .
      5. Sao chép tên người dùng và mật khẩu của luồng không khí
  2. Định cấu hình các biến cho từng định nghĩa DAG trên máy chủ web Luồng khí Apache .

    DAGKeyValue typeSample valueDescriptionManifest nhập phần mềm core__auth__access_token Chuỗi{{keycloak_JWT_Access_Token}}

    Định nghĩa DAG này nhận một số biến từ môi trường hệ điều hành và một số từ đối tượng JSON yêu cầu kích hoạt

    Để biết thêm thông tin, hãy xem Các bước mẫu để nhập tệp kê khai

    core__ingestion__batch_count Integer3 core__config__dataload_config_path Stringabc core__service__search__url Chuỗi http. //os-search-ibm. chúng tôi bạn. svc. cụm. địa phương. 8080/api/search/v2/query core__service__schema__url Stringhttp. // os-lược đồ-ibm. chúng tôi bạn. svc. cụm. địa phương. 8080/api/schema-service/v1/schema core__service__storage__url Stringhttp. //os-storage-ibm. chúng tôi bạn. svc. cụm. địa phương. 8080/api/storage/v2/records core__service__file__host Stringhttp. //os-tệp-ibm. chúng tôi bạn. svc. cụm. địa phương. 8080/api/file core__service__workflow__host Stringhttp. //os-workflow-ibm. chúng tôi bạn. svc. cụm. địa phương. 8080/api/workflow client_id Đăng nhập Stringosdu client_secret Stringa_valid_client_secret_for_the_environmentNhập trình phân tích cú pháp CSVStringa_valid_client_secret_for_the_environmentCSV a>Stringa_valid_client_secret_for_the_environmentCSV a>Stringa_valid_client_secret_for_the_environmentCSV a>Stringa_valid_client_secret_for_the_environmentCSV a>Stringa_valid_client_secret_for_the_environmentCSV a>Stringa_valid_client_secret_for_the_environmentCSV a>Stringa_valid_client_secret_for_the_environmentCSV a>Stringa_valid_client_secret_for_the_environmentCSV a>Stringa_valid_client_secret_for_the_environmentCSV a>Stringa_valid_client_secret_for_the_environmentCSV a>Stringa_valid_client_secret_for_the_environmentCSV a>Stringa_valid_client_secret_for_the_environmentCSV a> DOCKER_IMAGE Cộng đồng chuỗi. nhóm mở. tổ chức. 5555/osdu/platform/data-flow/ingestion/csv-parser/csv-parser/csv-parser-dag-ibm-v2Định nghĩa DAG này sử dụng Kubernetes Pod Operator để xử lý các tác vụ. Nó yêu cầu URL hình ảnh docker và không gian tên để tạo nhóm Kubernetes.

    Biến env_var cung cấp tất cả các điểm cuối dịch vụ mà người vận hành cần để xử lý tệp đầu vào.

    NAMESPACE Luồng không khí Stringosdu env_var JSON

    { "FILE_SERVICE_ENDPOINT": "//os-file-ibm.osdu.svc.cluster.local:8080/api/file/v2", "PARTITION_API": "//os-partition-ibm.osdu.svc.cluster.local:8080/api/partition/v1", "SCHEMA_SERVICE_ENDPOINT": "//os-schema-ibm.osdu.svc.cluster.local:8080/api/schema-service/v1", "SEARCH_SERVICE_ENDPOINT": "//os-search-ibm.osdu.svc.cluster.local:8080/api/search/v2", "STORAGE_SERVICE_ENDPOINT": "//os-storage-ibm.osdu.svc.cluster.local:8080/api/storage/v2", "UNIT_SERVICE_ENDPOINT": "//os-unit-ibm.osdu.svc.cluster.local:8080/api/unit/v2", "WORKFLOW_SERVICE_ENDPOINT": "//os-workflow-ibm.osdu.svc.cluster.local:8080/api/workflow/v1", "OSDU_STORAGE_BATCH_SIZE": "20", "OSDU_STORAGE_THREAD_POOL_SIZE": "70" }

    Chuyển đổi SGY sang ZGY SGY_TO_ZGY_DOCKER_IMAGE Cộng đồng chuỗi. nhóm mở. tổ chức. 5555/osdu/platform/data-flow/ingestion/segy-to-zgy-conversion/segy-to-zgy-conversion-v0-12-2. f578638c56abb5087b018126e8e937547e693c7dĐịnh nghĩa DAG này sử dụng Kubernetes Pod Operator để xử lý các tác vụ. Nó yêu cầu URL hình ảnh docker và không gian tên để tạo nhóm Kubernetes.

    Biến sgy_to_zgy_env_var cung cấp tất cả các điểm cuối dịch vụ mà người vận hành cần để xử lý tệp đầu vào. Ngoài ra, các biến khác để kiểm soát quá trình xử lý là cần thiết để tối ưu hóa quy trình.

    SGY_TO_ZGY_NAMESPACE Luồng không khí Stringosdu sgy_to_zgy_env_var JSON

    { "STORAGE_SVC_URL": "//os-storage-ibm.osdu.svc.cluster.local:8080/api/storage/v2/", "SD_SVC_URL": "//os-seismic-store-service.osdu.svc.cluster.local:8080/api/v3", "SD_SVC_TOKEN": "", "SPATIALREF_SVC_TOKEN": "authorization", "STORAGE_SVC_TOKEN": "", "STORAGE_SVC_API_KEY": "", "SD_SVC_API_KEY": "ok", "OSDU_DATAPARTITIONID": "opendes", "SD_READ_CACHE_PAGE_SIZE": "4195024", "SD_READ_CACHE_MAX_PAGES": "256", "SEGYTOZGY_VERBOSITY": "3", "SEGYTOZGY_GENERATE_INDEX": "1", "IBM_COS_URL":"{{minio_cos_url}}" }

    Chuyển đổi từ SGY sang VDS mở SGY_TO_VDS_DOCKER_IMAGE Cộng đồng chuỗi. nhóm mở. tổ chức. 5555/osdu/platform/domain-data-mgmt-services/seismic/open-vds/openvds-ingestion. 2. 1. 9 Định nghĩa DAG này được sử dụng Kubernetes Pod Operator để xử lý các tác vụ. Nó yêu cầu URL hình ảnh docker và không gian tên để tạo nhóm Kubernetes.

    Biến sgy_to_vds_env_var cung cấp tất cả các điểm cuối dịch vụ mà người vận hành cần để xử lý tệp đầu vào.

    SGY_TO_VDS_NAMESPACE Luồng không khí Stringosdu sgy_to_vds_env_var JSON{"IBM_COS_URL". " {{minio_cos_url}} "}Nhập WITSML NAMESPACE Stringosdu-airflow Định nghĩa DAG này sử dụng Toán tử KubernetesPodOperator và Python để nhập tài liệu WITSML XML. Nó yêu cầu NAMESPACE để tạo nhóm Kubernetes. Các toán tử Python có sẵn như một phần của SDK python phổ biến, được cài đặt như một phần của Dữ liệu mở cho các ngành Cài đặt luồng không khí. OSDU_API_CONFIG String/opt/airflow/dags/repo/osdu_api_config. yaml

  3. Xác thực bản đồ cấu hình props-ingestion trong osdu không gian tên.

    Bạn nhận được một bộ khóa cho chúng tôi. luồng không khí. url thành giá trị thích hợp của dịch vụ web Apache Airflow . Nó cho phép các dịch vụ cốt lõi Dữ liệu mở cho các ngành tương tác với Luồng không khí của Apache thông qua API REST.

  4. Bật định nghĩa của DAG, định nghĩa này xuất hiện trên bảng điều khiển Luồng khí Apache .

    Chuyển đổi công tắc BẬT và TẮT, nằm bên trái tên DAG. Do đó, các DAG sẵn sàng xử lý các yêu cầu do DAG kích hoạt

    Làm cách nào để sử dụng DAG trong Python?

    Để tạo DAG Python trong Luồng không khí, bạn phải luôn nhập lớp DAG Python bắt buộc . Theo sau lớp DAG là nhập Toán tử. Về cơ bản, bạn phải nhập Toán tử tương ứng cho từng cái bạn muốn sử dụng. Ví dụ, để thực thi một hàm Python, bạn phải nhập PythonOperator.

    DAG trong quy trình làm việc là gì?

    DAG đang xử lý . DAG chứa các phần công việc riêng lẻ, được gọi là tác vụ và cả những việc cần làm nếu tác vụ không thành công . Các nhiệm vụ mô tả làm thế nào để. Thực hiện tìm nạp dữ liệu. Chạy phân tích.

    DAG có được viết bằng Python không?

    ​ Trong Luồng không khí, biểu đồ chu kỳ có hướng [DAG] là một đường dẫn dữ liệu được xác định bằng mã Python . Mỗi NGÀY đại diện cho một tập hợp các tác vụ bạn muốn chạy và được sắp xếp để hiển thị mối quan hệ giữa các tác vụ trong giao diện người dùng Airflow.

    Làm cách nào để Airflow phát hiện ra rằng tệp Python là DAG?

    Luồng khí tải DAG từ các tệp nguồn Python mà luồng khí tìm kiếm bên trong DAG_FOLDER đã định cấu hình . Nó sẽ lấy từng tệp, thực thi nó và sau đó tải bất kỳ đối tượng DAG nào từ tệp đó. Điều này có nghĩa là bạn có thể xác định nhiều DAG cho mỗi tệp Python hoặc thậm chí trải rộng một DAG rất phức tạp trên nhiều tệp Python bằng cách nhập.

Chủ Đề