【Python】使用Python编写和提交Argo工作流

使用Python编写和提交Argo工作流

Donald发布于 今天 04:47

10人将获赠CNCF商店$100美元礼券!

你填了吗?

【Python】使用Python编写和提交Argo工作流

问卷链接(https://www.wjx.cn/jq/9714648...)


作者:Alex Collins

Python 是用户在 Kubernetes 上编写机器学习工作流的流行编程语言。

开箱即用时,Argo 并没有为 Python 提供一流的支持。相反,我们提供Java、Golang 和 Python API 客户端。

但这对大多数用户来说还不够。许多用户需要一个抽象层来添加组件和特定于用例的特性。

今天你有两个选择。

KFP 编译器+ Python 客户端

Argo 工作流被用作执行 Kubeflow 流水线的引擎。你可以定义一个 Kubeflow 流水线,并在 Python 中将其直接编译到 Argo 工作流中。

然后你可以使用Argo Python 客户端向 Argo 服务器 API 提交工作流。

这种方法允许你利用现有的 Kubeflow 组件。

安装:

pip3 install kfp

pip3 install argo-workflows

例子:

import kfp as kfp

def flip_coin():

return kfp.dsl.ContainerOp(

name='Flip a coin',

image='python:alpine3.6',

command=['python', '-c', """

import random

res = "heads" if random.randint(0, 1) == 0 else "tails"

with open('/output', 'w') as f:

f.write(res)

"""],

file_outputs={'output': '/output'}

)

def heads():

return kfp.dsl.ContainerOp(name='Heads', image="alpine:3.6", command=["sh", "-c", 'echo "it was heads"'])

def tails():

return kfp.dsl.ContainerOp(name='Tails', image="alpine:3.6", command=["sh", "-c", 'echo "it was tails"'])

@kfp.dsl.pipeline(name='Coin-flip', description='Flip a coin')

def coin_flip_pipeline():

flip = flip_coin()

with kfp.dsl.Condition(flip.output == 'heads'):

heads()

with kfp.dsl.Condition(flip.output == 'tails'):

tails()

def main():

kfp.compiler.Compiler().compile(coin_flip_pipeline, __file__ + ".yaml")

if __name__ == '__main__':

main()

运行这个来创建你的工作流:

apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

generateName: coin-flip-

annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.3.0, pipelines.kubeflow.org/pipeline_compilation_time: '2021-01-21T17:17:54.299235',

pipelines.kubeflow.org/pipeline_spec: '{"description": "Flip a coin", "name":

"Coin-flip"}'}

labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.3.0}

spec:

entrypoint: coin-flip

templates:

- name: coin-flip

dag:

tasks:

- name: condition-1

template: condition-1

when: '"{{tasks.flip-a-coin.outputs.parameters.flip-a-coin-output}}" == "heads"'

dependencies: [flip-a-coin]

- name: condition-2

template: condition-2

when: '"{{tasks.flip-a-coin.outputs.parameters.flip-a-coin-output}}" == "tails"'

dependencies: [flip-a-coin]

- {name: flip-a-coin, template: flip-a-coin}

- name: condition-1

dag:

tasks:

- {name: heads, template: heads}

- name: condition-2

dag:

tasks:

- {name: tails, template: tails}

- name: flip-a-coin

container:

command:

- python

- -c

- "\nimport random\nres = \"heads\" if random.randint(0, 1) == 0 else \"tails\"\

\nwith open('/output', 'w') as f:\n f.write(res) \n "

image: python:alpine3.6

outputs:

parameters:

- name: flip-a-coin-output

valueFrom: {path: /output}

artifacts:

- {name: flip-a-coin-output, path: /output}

- name: heads

container:

command: [sh, -c, echo "it was heads"]

image: alpine:3.6

- name: tails

container:

command: [sh, -c, echo "it was tails"]

image: alpine:3.6

arguments:

parameters: []

serviceAccountName: pipeline-runner

注意,Kubeflow 不支持这种方法。

你可以使用客户端提交上述工作流程如下:

import yaml

from argo.workflows.client import (ApiClient,

WorkflowServiceApi,

Configuration,

V1alpha1WorkflowCreateRequest)

def main():

config = Configuration(host="http://localhost:2746")

client = ApiClient(configuration=config)

service = WorkflowServiceApi(api_client=client)

with open("coin-flip.py.yaml") as f:

manifest: dict = yaml.safe_load(f)

del manifest['spec']['serviceAccountName']

service.create_workflow('argo', V1alpha1WorkflowCreateRequest(workflow=manifest))

if __name__ == '__main__':

main()

【Python】使用Python编写和提交Argo工作流

Couler

Couler是一个流行的项目,它允许你以一种平台无感的方式指定工作流,但它主要支持 Argo 工作流(计划在未来支持 Kubeflow 和 AirFlow):

安装:

pip3 install git+https://github.com/couler-proj/couler

例子:

import couler.argo as couler

from couler.argo_submitter import ArgoSubmitter

def random_code():

import random

res = "heads" if random.randint(0, 1) == 0 else "tails"

print(res)

def flip_coin():

return couler.run_script(image="python:alpine3.6", source=random_code)

def heads():

return couler.run_container(

image="alpine:3.6", command=["sh", "-c", 'echo "it was heads"']

)

def tails():

return couler.run_container(

image="alpine:3.6", command=["sh", "-c", 'echo "it was tails"']

)

result = flip_coin()

couler.when(couler.equal(result, "heads"), lambda: heads())

couler.when(couler.equal(result, "tails"), lambda: tails())

submitter = ArgoSubmitter()

couler.run(submitter=submitter)

这会创建以下工作流程:

apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

generateName: couler-example-

spec:

templates:

- name: couler-example

steps:

- - name: flip-coin-29

template: flip-coin

- - name: heads-31

template: heads

when: '{{steps.flip-coin-29.outputs.result}} == heads'

- name: tails-32

template: tails

when: '{{steps.flip-coin-29.outputs.result}} == tails'

- name: flip-coin

script:

name: ''

image: 'python:alpine3.6'

command:

- python

source: |

import random

res = "heads" if random.randint(0, 1) == 0 else "tails"

print(res)

- name: heads

container:

image: 'alpine:3.6'

command:

- sh

- '-c'

- echo "it was heads"

- name: tails

container:

image: 'alpine:3.6'

command:

- sh

- '-c'

- echo "it was tails"

entrypoint: couler-example

ttlStrategy:

secondsAfterCompletion: 600

activeDeadlineSeconds: 300

【Python】使用Python编写和提交Argo工作流

点击阅读网站原文。


CNCF (Cloud Native Computing Foundation)成立于2015年12月,隶属于Linux  Foundation,是非营利性组织。
CNCF(云原生计算基金会)致力于培育和维护一个厂商中立的开源生态系统,来推广云原生技术。我们通过将最前沿的模式民主化,让这些创新为大众所用。扫描二维码关注CNCF微信公众号。
【Python】使用Python编写和提交Argo工作流

python工作流kubernetes云原生-cloud-nativecncf

本文系转载,阅读原文

https://mp.weixin.qq.com/s/Eafuk2pRLQHQRVbf6mHhEw

阅读 40发布于 今天 04:47

avatar

Donald

布道者,Linux基金会(LFAPAC)

101 声望

388 粉丝

0 条评论

得票时间

avatar

Donald

布道者,Linux基金会(LFAPAC)

101 声望

388 粉丝

宣传栏

10人将获赠CNCF商店$100美元礼券!

你填了吗?

【Python】使用Python编写和提交Argo工作流

问卷链接(https://www.wjx.cn/jq/9714648...)


作者:Alex Collins

Python 是用户在 Kubernetes 上编写机器学习工作流的流行编程语言。

开箱即用时,Argo 并没有为 Python 提供一流的支持。相反,我们提供Java、Golang 和 Python API 客户端。

但这对大多数用户来说还不够。许多用户需要一个抽象层来添加组件和特定于用例的特性。

今天你有两个选择。

KFP 编译器+ Python 客户端

Argo 工作流被用作执行 Kubeflow 流水线的引擎。你可以定义一个 Kubeflow 流水线,并在 Python 中将其直接编译到 Argo 工作流中。

然后你可以使用Argo Python 客户端向 Argo 服务器 API 提交工作流。

这种方法允许你利用现有的 Kubeflow 组件。

安装:

pip3 install kfp

pip3 install argo-workflows

例子:

import kfp as kfp

def flip_coin():

return kfp.dsl.ContainerOp(

name='Flip a coin',

image='python:alpine3.6',

command=['python', '-c', """

import random

res = "heads" if random.randint(0, 1) == 0 else "tails"

with open('/output', 'w') as f:

f.write(res)

"""],

file_outputs={'output': '/output'}

)

def heads():

return kfp.dsl.ContainerOp(name='Heads', image="alpine:3.6", command=["sh", "-c", 'echo "it was heads"'])

def tails():

return kfp.dsl.ContainerOp(name='Tails', image="alpine:3.6", command=["sh", "-c", 'echo "it was tails"'])

@kfp.dsl.pipeline(name='Coin-flip', description='Flip a coin')

def coin_flip_pipeline():

flip = flip_coin()

with kfp.dsl.Condition(flip.output == 'heads'):

heads()

with kfp.dsl.Condition(flip.output == 'tails'):

tails()

def main():

kfp.compiler.Compiler().compile(coin_flip_pipeline, __file__ + ".yaml")

if __name__ == '__main__':

main()

运行这个来创建你的工作流:

apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

generateName: coin-flip-

annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.3.0, pipelines.kubeflow.org/pipeline_compilation_time: '2021-01-21T17:17:54.299235',

pipelines.kubeflow.org/pipeline_spec: '{"description": "Flip a coin", "name":

"Coin-flip"}'}

labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.3.0}

spec:

entrypoint: coin-flip

templates:

- name: coin-flip

dag:

tasks:

- name: condition-1

template: condition-1

when: '"{{tasks.flip-a-coin.outputs.parameters.flip-a-coin-output}}" == "heads"'

dependencies: [flip-a-coin]

- name: condition-2

template: condition-2

when: '"{{tasks.flip-a-coin.outputs.parameters.flip-a-coin-output}}" == "tails"'

dependencies: [flip-a-coin]

- {name: flip-a-coin, template: flip-a-coin}

- name: condition-1

dag:

tasks:

- {name: heads, template: heads}

- name: condition-2

dag:

tasks:

- {name: tails, template: tails}

- name: flip-a-coin

container:

command:

- python

- -c

- "\nimport random\nres = \"heads\" if random.randint(0, 1) == 0 else \"tails\"\

\nwith open('/output', 'w') as f:\n f.write(res) \n "

image: python:alpine3.6

outputs:

parameters:

- name: flip-a-coin-output

valueFrom: {path: /output}

artifacts:

- {name: flip-a-coin-output, path: /output}

- name: heads

container:

command: [sh, -c, echo "it was heads"]

image: alpine:3.6

- name: tails

container:

command: [sh, -c, echo "it was tails"]

image: alpine:3.6

arguments:

parameters: []

serviceAccountName: pipeline-runner

注意,Kubeflow 不支持这种方法。

你可以使用客户端提交上述工作流程如下:

import yaml

from argo.workflows.client import (ApiClient,

WorkflowServiceApi,

Configuration,

V1alpha1WorkflowCreateRequest)

def main():

config = Configuration(host="http://localhost:2746")

client = ApiClient(configuration=config)

service = WorkflowServiceApi(api_client=client)

with open("coin-flip.py.yaml") as f:

manifest: dict = yaml.safe_load(f)

del manifest['spec']['serviceAccountName']

service.create_workflow('argo', V1alpha1WorkflowCreateRequest(workflow=manifest))

if __name__ == '__main__':

main()

【Python】使用Python编写和提交Argo工作流

Couler

Couler是一个流行的项目,它允许你以一种平台无感的方式指定工作流,但它主要支持 Argo 工作流(计划在未来支持 Kubeflow 和 AirFlow):

安装:

pip3 install git+https://github.com/couler-proj/couler

例子:

import couler.argo as couler

from couler.argo_submitter import ArgoSubmitter

def random_code():

import random

res = "heads" if random.randint(0, 1) == 0 else "tails"

print(res)

def flip_coin():

return couler.run_script(image="python:alpine3.6", source=random_code)

def heads():

return couler.run_container(

image="alpine:3.6", command=["sh", "-c", 'echo "it was heads"']

)

def tails():

return couler.run_container(

image="alpine:3.6", command=["sh", "-c", 'echo "it was tails"']

)

result = flip_coin()

couler.when(couler.equal(result, "heads"), lambda: heads())

couler.when(couler.equal(result, "tails"), lambda: tails())

submitter = ArgoSubmitter()

couler.run(submitter=submitter)

这会创建以下工作流程:

apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

generateName: couler-example-

spec:

templates:

- name: couler-example

steps:

- - name: flip-coin-29

template: flip-coin

- - name: heads-31

template: heads

when: '{{steps.flip-coin-29.outputs.result}} == heads'

- name: tails-32

template: tails

when: '{{steps.flip-coin-29.outputs.result}} == tails'

- name: flip-coin

script:

name: ''

image: 'python:alpine3.6'

command:

- python

source: |

import random

res = "heads" if random.randint(0, 1) == 0 else "tails"

print(res)

- name: heads

container:

image: 'alpine:3.6'

command:

- sh

- '-c'

- echo "it was heads"

- name: tails

container:

image: 'alpine:3.6'

command:

- sh

- '-c'

- echo "it was tails"

entrypoint: couler-example

ttlStrategy:

secondsAfterCompletion: 600

activeDeadlineSeconds: 300

【Python】使用Python编写和提交Argo工作流

点击阅读网站原文。


CNCF (Cloud Native Computing Foundation)成立于2015年12月,隶属于Linux  Foundation,是非营利性组织。
CNCF(云原生计算基金会)致力于培育和维护一个厂商中立的开源生态系统,来推广云原生技术。我们通过将最前沿的模式民主化,让这些创新为大众所用。扫描二维码关注CNCF微信公众号。
【Python】使用Python编写和提交Argo工作流

以上是 【Python】使用Python编写和提交Argo工作流 的全部内容, 来源链接: utcz.com/a/108880.html

回到顶部