开始使用 Dapr Workflow Python SDK
如何使用 Dapr Python SDK 启动和运行工作流
注意
Dapr工作流目前处于alpha阶段。让我们创建一个 Dapr 工作流,并使用控制台调用它。 通过提供的hello world工作流示例,您将:
- 运行一个使用
DaprClient
的Python控制台应用程序 - 利用Python工作流SDK和API调用来启动、暂停、恢复、终止和清除工作流实例
这个示例使用dapr init中的默认配置,在自托管模式下。
在Python示例项目中,app.py
文件包含了应用程序的设置,包括:
- 工作流定义
- 工作流活动定义
- 工作流和工作流活动的注册
前期准备
- 已安装Dapr CLI
- 初始化Dapr环境
- 已安装Python 3.8+。
- Dapr Python包和工作流扩展已安装
- 验证您是否正在使用最新的proto绑定
设置环境
运行以下命令使用 Dapr Python SDK 安装运行此工作流示例所需的依赖项。
pip3 install -r demo_workflow/requirements.txt
克隆[Python SDK存储库]。
git clone https://github.com/dapr/python-sdk.git
从 Python SDK 根目录中,导航到 Dapr Workflow 示例。
cd examples/demo_workflow
在本地运行应用程序
要运行 Dapr 应用程序,您需要启动 Python 程序和 Dapr sidecar。 在终端中运行:
dapr run --app-id orderapp --app-protocol grpc --dapr-grpc-port 50001 --resources-path components --placement-host-address localhost:50005 -- python3 app.py
**注意:**由于Python3.exe在Windows中未定义,您可能需要使用
python app.py
替代python3 app.py
。
预期输出
== APP == ==========Start Counter Increase as per Input:==========
== APP == start_resp exampleInstanceID
== APP == Hi Counter!
== APP == New counter value is: 1!
== APP == Hi Counter!
== APP == New counter value is: 11!
== APP == Hi Counter!
== APP == Hi Counter!
== APP == Get response from hello_world_wf after pause call: Suspended
== APP == Hi Counter!
== APP == Get response from hello_world_wf after resume call: Running
== APP == Hi Counter!
== APP == New counter value is: 111!
== APP == Hi Counter!
== APP == Instance Successfully Purged
== APP == start_resp exampleInstanceID
== APP == Hi Counter!
== APP == New counter value is: 1112!
== APP == Hi Counter!
== APP == New counter value is: 1122!
== APP == Get response from hello_world_wf after terminate call: Terminated
== APP == Get response from child_wf after terminate call: Terminated
== APP == Instance Successfully Purged
发生了什么?
当你运行 dapr run
,Dapr 客户端:
- 注册了工作流(
hello_world_wf
)及其活动(hello_act
) - 启动工作流引擎
def main():
with DaprClient() as d:
host = settings.DAPR_RUNTIME_HOST
port = settings.DAPR_GRPC_PORT
workflowRuntime = WorkflowRuntime(host, port)
workflowRuntime = WorkflowRuntime()
workflowRuntime.register_workflow(hello_world_wf)
workflowRuntime.register_activity(hello_act)
workflowRuntime.start()
print("==========Start Counter Increase as per Input:==========")
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")
Dapr 然后暂停并恢复工作流:
# Pause
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")
# Resume
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")
一旦工作流恢复,Dapr会触发一个工作流事件并打印新的计数器值:
# Raise event
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
event_name=eventName, event_data=eventData)
为了清除状态存储中的工作流状态,Dapr已经清除了工作流:
# Purge
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("Instance Successfully Purged")
然后示范了通过以下方式终止工作流程:
- 使用与清除的工作流相同的
instanceId
启动新的工作流程。 - 在关闭工作流之前终止工作流并清除。
# Kick off another workflow
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")
# Terminate
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
sleep(1)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")
# Purge
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("Instance Successfully Purged")
下一步
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.