Python Plugins

从一个最简单的例子开始

import megflow
@megflow.register(name="alias", inputs=["inp"], outputs=["out"])
class Node:
    def __init__(self, name, args):
        pass
    def exec(self):
        envelope = self.inp.recv()
        msg = dowith(envelope.msg)
        self.out.send(envelope.repack(msg))

这其中有三部分内容: register装饰器,Node的构造函数,Node的执行函数

  1. register装饰器

    • name: 若指定,则register所修饰插件重命名为name,默认为register所修饰类的类名

    • inputs: Node的输入列表,每个输入input都可以在exec方法中,通过self.input访问,

    • outputs: Node的输出列表,每个输出output都可以在exec方法中,通过self.output访问

    • exclusive: 默认为False, 调度模型是一个thread local的协程调度器, 若为True, 则将该任务安排到线程池中

  2. Node的构造函数

    • name: 即参数文件中Node的name字段

    • args: 即参数文件中Node的剩余参数字段

  3. Node的执行函数

    • 一个python插件的执行方法必须是命名为exec的零参成员方法

    • 对于在参数文件中该插件引用的资源resource, 可以在exec方法中,通过self.resource访问

    • 通过输入的recv方法取得输入消息,输入消息是Envelope对象,其msg成员即开发者真正需要读写的消息实例

    • Envelope语义为在图中流转的消息的相关信息,由于这些信息需要在图中被传递,所以开发者应该保持消息与Envelope的对应关系

    • 若一个Envelope携带的消息被拆分为多个消息,或者转换为另一个消息,应该通过Enveloperepack方法,将Envelope与消息关联起来

    • 通过输出的send方法发送输出消息,输出消息是Envelope对象

MegFlow也提供了一系列异步工具

  1. yield_now(), 让出当前任务的执行权

  2. sleep(dur), 使当前任务沉睡dur毫秒

  3. join(tasks), tasks参数是一个函数列表,join堵塞直到tasks中的函数都执行完毕

  4. create_future(callback), callback参数是一个函数, 默认值为None,create_future返回一个(Future, Waker)对象

    • Future::wait, 堵塞直到Waker::wake被调用,返回Waker::wake(result)传入的result参数