API: Flows

阅读本文之前,你应该对 Corda 核心概念 – Flow 比较熟悉了。

一个 Flow 的例子

在我们讨论 flow 提供的 API 之前,让我们来想一下一个标准的 flow 应该像什么样子。

我们可以想象一个 Alice 和 Bob 之间同意一个基本的账本更新的 flow。这个 flow 会包含两边:

  • 发起者(Initiator)的一边,会发起更新账本的请求
  • 反馈(Responder)的一边,会对更新账本的请求进行反馈

Initiator

Part1 – Build the transaction
1. 为 transaction 选择一个 notary
2. 创建一个 transaction builder
3. 提取出所有需要的来自 vault 的 input states 并把他们加入到 builder
4. 创建所有需要的 output states 并把他们加入到 builder
5. 向 builder 里添加所有需要的 commands,attachment 和 timestamp

Part2 – Sign the transaction
6. 为 transaction builder 提供签名
7. 将这个 builder 转换成一个 signed transaction

Part3 – Verify the transaction
8. 通过执行 transaction 的 contracts 来验证这个 transaction

Part4 – Gather the counterparty’s signature
9. 将 transaction 发送给 counterparty
10. 等待接收 counterparty 的签名
11. 将 counterparty 的签名添加到 transaction
12. 验证 transaction 的签名

Part5 – Finalize the transaction
13. 将 transaction 发送给 notary
14. 等待接收 notarised transaction 的反馈
15. 将 transaction 存储到本地
16. 将所有相关的 states 存储到 vault
17. 将 transaction 发送到 counterparty 去记录

我们可以用下边的 flow 图来表示这个工作流程:

Responder

为了对这些动作进行反馈, responder 进行一下步骤的操作:

Part1 – Sign the transaction
1. 从 counterparty 接收 transaction
2. 验证 transaction 中已经存在的签名
3. 通过执行 transaction 的 contracts 来验证 transaction
4. 对该 transaction 生成自己的签名
5. 将签名发送回给 counterparty

Part2 – Record the transaction
6. 从 counterparty 那边接收 notarised transaction
7. 将 transaction 记录到本地
8. 将所有相关的 states 记录到 vault

FlowLogic

常规来讲,一个 flow 会作为一个或者多个 FlowLogic 子类被实现的。FlowLogic 子类的构造体能够包含任意数量任意类型的参数。通常的 FlowLogic(比如 FlowLogic<SignedTransaction>)表明了 flow 的返回类型。

class Initiator(val arg1: Boolean,
                val arg2: Int,
                val counterparty: Party): FlowLogic<SignedTransaction>() { }

class Responder(val otherParty: Party) : FlowLogic<Unit>() { }

FlowLogic annotations

任何你想要用来出发另一个 flow 的 flow,必须要用 @InitiatingFlow 这个 annotation 来进行标注。并且,如果你希望通过 RPC 来开始一个 flow,你必须使用 @StartableByRPC 这个 annotation:

@InitiatingFlow
@StartableByRPC
class Initiator(): FlowLogic<Unit>() { }

任何一个作为对一个其他 flow 提供反馈的 flow,也必须使用 @InitiatedBy 这个 annotation 进行标注。@InitiatedBy 会使用它要反馈的 flow 的 class 作为唯一的一个参数:

@InitiatedBy(Initiator::class)
class Responder(val otherSideSession: FlowSession) : FlowLogic<Unit>() { }

另外,任何由 SchedulableState 开始的 flow 不要使用 @SchedulableFlow 这个 annotation 进行标注。

Call

每一个 FlowLogic 子类必须要重写 FlowLogic.call(),该方法描述了作为 flow 的一部分要执行怎样的动作。比如,发起一方的动作应该在 Initiator.call 中定义,反馈方的动作应该在 Responder.call 中定义。

为了让节点能够同时运行多个 flows,并且能够让 flows 在节点升级或者重启之后依旧可继续接着执行,flows 需要是 checkpointable 并且可以被序列化到磁盘的。这个可以通过将 FlowLogic.call() 和由 FlowLogic.call() 来调用的任何的方法上都带有 @Suspendable 标注。

class Initiator(val counterparty: Party): FlowLogic<Unit>() {
    @Suspendable
    override fun call() { }
}

ServiceHub

FlowLogic.call 中,flow 开发者可以访问节点的 ServiceHub,其提供了访问节点所提供的非常多的服务。查看 API: ServiceHub 获得 ServiceHub 提供的所有服务信息。

常规 flow tasks

在 FlowLogic.call 中你可以使用很多常规的任务(tasks)来同意一个账本的更新。下边的部分会介绍大部分常用的任务。

构建 transaction

在一个 flow 中主要要执行的工作就是构建、确认一个 transaction 并提供签名。这个在 API: Transactions 中有详细描述。

从账本上获得 states

当构建一个 transaction 的时候,你经常需要从账本上获得你希望去消费掉的 state。这个在 API: Vault Query 中有详细描述。

获得其他节点的信息

我们可以使用 ServiceHub.networkMapCache 来获得网络中其他节点的信息,包括提供哪些服务。

对 transaction 进行公证

一个 transaction 通常大豆需要一个 notary 来:

  • 如果 transaction 有 input 的话,需要避免双花(double-spends)
  • 如果 transaction 有 time-window 的话,要确保 transaction 只能在指定的 time-window 里被执行

有很多方法来从 network map 那里获得一个 notary:

val notaryName: CordaX500Name = CordaX500Name(
        organisation = "Notary Service",
        locality = "London",
        country = "GB")
val specificNotary: Party = serviceHub.networkMapCache.getNotary(notaryName)!!
// Alternatively, we can pick an arbitrary notary from the notary
// list. However, it is always preferable to specify the notary
// explicitly, as the notary list might change when new notaries are
// introduced, or old ones decommissioned.
val firstNotary: Party = serviceHub.networkMapCache.notaryIdentities.first()

指定 counterparties

我们也可以使用 network map 来获取一个指定的 counterparty 的信息:

val counterpartyName: CordaX500Name = CordaX500Name(
        organisation = "NodeA",
        locality = "London",
        country = "GB")
val namedCounterparty: Party = serviceHub.identityService.wellKnownPartyFromX500Name(counterpartyName) ?:
        throw IllegalArgumentException("Couldn't find counterparty for NodeA in identity service")
val keyedCounterparty: Party = serviceHub.identityService.partyFromKey(dummyPubKey) ?:
        throw IllegalArgumentException("Couldn't find counterparty with key: $dummyPubKey in identity service")

在 parties 之间进行沟通

为了在你的 initiator flow 和 receiver flow 之间创建一个沟通 session,你必须要调用 initiateFlow(party: Party): FlowSession

FlowSession 实例提供三个方法:

  • send(payload: Any)
    • 发送 payload 对象
  • receive(receiveType: Class<R>): R
    • 接收 receiveType 类型的对象
  • sendAndReceive(receiveType: Class<R>, payload: Any): R
    • 发送 payload 对象并且接收 receiveType 类型的对象

InitiateFlow

initiateFlow 创建了一个同传进来的 Party 的一个沟通 session。

val counterpartySession: FlowSession = initiateFlow(counterparty)

注意当调用这个方法的时候,还没有真实的沟通,这个会被推迟到第一次发送/接收的时候,在那个时间点 counterparty 会:

  • 如果他们没有被注册为这个 flow 提供反馈的话,会忽略这个消息
  • 如果他们被注册为针对这个 flow 要提供反馈的话,会开始这个 flow

Send

一旦我们有了一个 FlowSession 对象的话,我们就可以向 counterparty 发送任何的数据了:

counterpartySession.send(Any())

在另一方的 flow 最终必须要调用一个对应的 receive 来获得这个消息。

Receive

我们也可以等待从一个 counterparty 那里接收任何的数据。这就意味着在 counterparty 的 flow 中需要调用对应的 send 方法。以下是几种情况:

  • 我们从来没有收到一个返回的消息。在当前的设计中,flow 会被暂停直到节点的 owner 结束了 flow
  • counterparty 抛出了一个 FlowException 而不是返回一个消息。这个异常会传回给我们,我们可以通过这个异常来判断发生了什么错误
  • 我们收到了返回的消息,但是是一个错误的类型。这个时候,一个 FlowException 异常会被抛出
  • 我们收到了一个类型正确的消息,一切正常。

当调用了 receive(或者 sendAndReceive)方法的时候, FlowLogic 会被挂起直到它收到了一个反馈。

我们收到的数据会被打包在一个 UntrustworthyData 实例中。这提醒了我们我们收到的数据可能并不像它看起来的那样!我们必须要使用 lambda 来将 UntrustworthyData 拆包:

val packet1: UntrustworthyData<Int> = counterpartySession.receive<Int>()
val int: Int = packet1.unwrap { data ->
    // Perform checking on the object received.
    // T O D O: Check the received object.
    // Return the object.
    data
}

我们也不会限制只能给一个 counterparty 发消息或者只能从一个 counterparty 那里收到消息。一个 flow 可以给任意多的 parties 发送消息,并且每个 party 可以调用不同的 response flow:

val regulatorSession: FlowSession = initiateFlow(regulator)
regulatorSession.send(Any())
val packet3: UntrustworthyData<Any> = regulatorSession.receive<Any>()

注意:如果你从 @InitiatingFlow flow 中初始了多个 flows 的话,在接收方那边,你应该准备好被任何对应的 initiateFlow() 来调用。一种处理这个问题的方法是发送第一条 “role” 的消息给被初始化的 flow,说明一下对方的 flow 应该确认的应该是这个 initiating flow 的哪一部分。比如发送了一个枚举值,那么对方就应该使用 switch 语句来处理。

SendAndReceive

我们也可以使用一个调用来向 counterparty 发送数据并且等待一个指定类型的返回数据。发送的数据类型不需要必须和收到的返回数据类型一致:

val packet2: UntrustworthyData<Boolean> = counterpartySession.sendAndReceive<Boolean>("You can send and receive any class!")
val boolean: Boolean = packet2.unwrap { data ->
    // Perform checking on the object received.
    // T O D O: Check the received object.
    // Return the object.
    data
}

Counterparty response

假设我们现在是在 flow 对应的 Responder 的节点。我们刚刚收到了来自于 Initiator 的下边的一系列消息:

  1. 他们发送给我们 Any 实例
  2. 他们正在等待收到一个 Integer 类型的返回实例
  3. 他们发送了一个 String 的实例并且在等待收到一个 Boolean 类型的返回实例

我们这边的 flow 也必须要反映出这样的调用。我们可以:

val any: Any = counterpartySession.receive<Any>().unwrap { data -> data }
val string: String = counterpartySession.sendAndReceive<String>(99).unwrap { data -> data }
counterpartySession.send(true)

为什么要 Session?

FlowSesion 被引入之前,send/receive API 看起来是有点不同的。他们是在 FlowLogic 上的功能并且是将 Party 作为参数。这个平台在内部会维护一个从 Party 到 session 的 mapping,对用户完全将 session 隐藏起来。

尽管这是一个很方便的 API,但它引入了一些小的问题,就是原来针对于一个指定 session 的消息可能最后跑到了另外一个 session 里。

下边是使用以前的基于 Party 的 API 的例子:

@InitiatingFlow
class LaunchSpaceshipFlow : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val shouldLaunchSpaceship = receive<Boolean>(getPresident()).unwrap { it }
        if (shouldLaunchSpaceship) {
            launchSpaceship()
        }
    }

    fun launchSpaceship() {
    }

    fun getPresident(): Party {
        TODO()
    }
}

@InitiatedBy(LaunchSpaceshipFlow::class)
@InitiatingFlow
class PresidentSpaceshipFlow(val launcher: Party) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val needCoffee = true
        send(getSecretary(), needCoffee)
        val shouldLaunchSpaceship = false
        send(launcher, shouldLaunchSpaceship)
    }

    fun getSecretary(): Party {
        TODO()
    }
}

@InitiatedBy(PresidentSpaceshipFlow::class)
class SecretaryFlow(val president: Party) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        // ignore
    }
}

这个 Flows 的目的很明确:LaunchSpaceshipFlow 在询问长官是否可以让一个宇宙飞船登陆。它期望着一个 boolean 类型的回复(是或否)。长官的回复首先是告诉秘书他们需要 coffee,这个沟通的内容也是是个 boolean 型的回答。然后长官又回复说他们并不希望飞船降落。

然而上边的情况在 launchergetsecretary 返回的是同一个 party 的话会变得很糟糕。如果真的发生了的话,那么这个 boolean 就意味着 secretary 会被 launcher 接收到。

这就说明了 Party 对于沟通的顺序来说并不是一个很好的身份标识,并且事实上基于 Part 的 API 也可能会为黑客引入了一个新的方式来钓鱼用户信息甚至像上边说的那样触发一个并不应该的 flow。

因为我们引入了 FlowSession,用来标识沟通的顺序。通过 FlowSession,上边的一系列 flows 会变成下边这样:

@InitiatingFlow
class LaunchSpaceshipFlowCorrect : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val presidentSession = initiateFlow(getPresident())
        val shouldLaunchSpaceship = presidentSession.receive<Boolean>().unwrap { it }
        if (shouldLaunchSpaceship) {
            launchSpaceship()
        }
    }

    fun launchSpaceship() {
    }

    fun getPresident(): Party {
        TODO()
    }
}

@InitiatedBy(LaunchSpaceshipFlowCorrect::class)
@InitiatingFlow
class PresidentSpaceshipFlowCorrect(val launcherSession: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val needCoffee = true
        val secretarySession = initiateFlow(getSecretary())
        secretarySession.send(needCoffee)
        val shouldLaunchSpaceship = false
        launcherSession.send(shouldLaunchSpaceship)
    }

    fun getSecretary(): Party {
        TODO()
    }
}

@InitiatedBy(PresidentSpaceshipFlowCorrect::class)
class SecretaryFlowCorrect(val presidentSession: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        // ignore
    }
}

注意现在长官是如何显式地说明他想发送个哪一个 session。

从旧的基于 Party 的 API到新的 API 的转换

在旧的 API 中,对一个 Party 的第一个发送(send)或者接收(receive)会是那个开始 counter-flow 的。这个现在是在调用 initiateFlow 方法中显式地定义的:

send(regulator, Any()) // Old API
// becomes
val session = initiateFlow(regulator)
session.send(Any())

Subflows

Subflows 是一些可能被重用的 flows 并可以用过调用 FlowLogic.subFlow 来运行。这里有两大类的 fubflows,inlined 和 initiating 的。主要的不同在于 counter-flow 的开始方法,initiating subflows 会自动地开始一个 counter-flows,然而 inlined subflows 期望由一个父的 counter-flow 来运行 inlined counter-part。

Inlined Subflows

Inlined subflows 在和 counterparty 初始一个新的 session 的时候继承了调用他们的 flow 的类型。比如假设我们有一个 flow A 调用了一个 inlined subflow B,这就会同一个 party 初始了一个 session。FlowLogic 类型会被用来判断哪一个 counter-flow 应该被开始,应该是 A 不是 B。这就意味着这个 inlined flow 的另一侧必须也要在 kicked off flow 中被显式地实现。这个可能通过调用一个匹配的 inlined counter-flow 或者在 kicked off 父 flow 中通过显式地实现另一侧来实现。

这样的 flow 的一个例子是 CollectSignaturesFlow。它有一个 counter-flow SignTransactionFlow,这个并没有被 InitatedBy 的标签。这是因为这两个 flow 都是 inlined;这个 kick-off 关系会被父 flows 通过调用 CollectSignaturesFlow 和 SignTransactionFlow 来定义。

在代码中,inlined subflows 会作为常规的一个 FlowLogic 的实例,并且没有 @InitiatingFlow@InitiatedBy 的标注。

注意:Inlined flows 并没有自己的版本,他们会继承他们父 flows 的版本。

Initiating subflows

Initiating subflows 是这些带有 @InitiatingFlow 标注的 subflows。当这样的 flow 初始了一个 session 的时候,它的类型会被用来确定哪一个 @InitiatedBy 的flow 会在对方那里被开始。

一个例子就是 FlowCookbook 中的 @InitiatingFlow InitiatorFlow/@InitiatedBy ResponderFlow

注意:Initiating flows 有自己的版本,跟它的父 flows 是分开的。

核心 initiating subflows

Corda 提供的 initiating subflows 针对于标准的 subflows 有一点点不同,就是他们是跟着平台的版本在一起的,并且他们初始的 counter-flows 是被显式地注册的,所以就不需要有 InitiatedBy 这个注释了。

一个例子就是 FinalityFlow/FinalityHander 这个 flow 对。

内置的 subflows

Corda 提供了很多内置的 flows 用来处理常见的任务。比较重要的有:

  • CollectSignaturesFlow(inlined),用来搜集一个 transaction 所要求的签名
  • FinalityFlow(initiating),用来公正(notarise)和记录 transaction 并且将消息发送给所有相关的 parties
  • SendTransactionFlow(inlined),用来发送一个签了名的 transaction,如果这个 transaction 需要自另一方去处理的话
  • ReceiveTransactionFlow(inlined),用来接收一个已经被签名了的 transaction
  • ContractUpgradeFlow,用来变更一个 state 的 contract
  • NotaryChagneFlow,用来变更一个 state 的 notary

我们来看三个非常常见的 subflow 例子。

FinalityFlow

FinalityFlow 允许我们来公证一个 transaction 并且让所有参与者都可以将 transaction 的 states 记录到账本中:

val notarisedTx1: SignedTransaction = subFlow(FinalityFlow(fullySignedTx, FINALISATION.childProgressTracker()))

我们也可以将 transaction 发送给额外的 parties 即使他们不是 state 的参与者:

val additionalParties: Set<Party> = setOf(regulator)
val notarisedTx2: SignedTransaction = subFlow(FinalityFlow(fullySignedTx, additionalParties, FINALISATION.childProgressTracker()))

对于一个 transaction 仅仅需要一方来调用 FinalityFlow 来让所有的参与者记录它。这不需要每一方分别自己去调用。

CollectSignaturesFlow/SignTransactionFlow

都要由哪些 parties 来为 transaction 提供签名是在 transaction 的 commands 中定义的。一旦我们为 transaction 提供了自己的签名,我们可以使用 CollectSignaturesFlow 来搜集其他必须提供签名的 parties 的签名:

val fullySignedTx: SignedTransaction = subFlow(CollectSignaturesFlow(twiceSignedTx, setOf(counterpartySession, regulatorSession), SIGS_GATHERING.childProgressTracker()))

每一个要求提供签名的 party 需要调用他们自己的 SignTransactionFlow 子类来检查 transaction 并且在满足要求后提供自己的签名:

val signTransactionFlow: SignTransactionFlow = object : SignTransactionFlow(counterpartySession) {
    override fun checkTransaction(stx: SignedTransaction) = requireThat {
        // Any additional checking we see fit...
        val outputState = stx.tx.outputsOfType<DummyState>().single()
        assert(outputState.magicNumber == 777)
    }
}

subFlow(signTransactionFlow)

SendTransactionFlow/ReceiveTransactionFlow

验证一个从 counterparty 发送来的 transaction 也需要验证 transaction 依赖链(dependency chain)上的每一个 transaction。这就意味着接收方需要能够向发送方要求这个依赖链的所有详细内容。发送方就可以使用 SendTransactionFlow 来发送 transaction,接收方就可以通过使用 ReceiveTransactionFlow 来查看所有依赖链的内容:

subFlow(SendTransactionFlow(counterpartySession, twiceSignedTx))

// Optional request verification to further restrict data access.
subFlow(object : SendTransactionFlow(counterpartySession, twiceSignedTx) {
    override fun verifyDataRequest(dataRequest: FetchDataFlow.Request.Data) {
        // Extra request verification.
    }
})

我们可以使用 ReceiveTransactionFlow 来接收 transaction,这会自动地下载所有的依赖并且确认 transaction:

val verifiedTransaction = subFlow(ReceiveTransactionFlow(counterpartySession))

我们也可以发送和接收一个 StateAndRef 依赖链并且自动地除了它的依赖:

subFlow(SendStateAndRefFlow(counterpartySession, dummyStates))

// On the receive side ...
val resolvedStateAndRef = subFlow(ReceiveStateAndRefFlow<DummyState>(counterpartySession))

为什么要用 inlined subflows

Inlined subflows 提供了一种分享常用的 flow code 的方式,这种方式要求用户必须要创建一个父的 flow。比如 CollectSignaturesFlow 这个例子。假设我们创建了一个 initiating flow 来自动开始一个 SignTransactionFlow 来为 transaction 提供签名。这意味着恶意的节点能够通过使用 CollectSignaturesFlow 只需向我们发送任何一个旧的 transaction,然后我们就会自动地为其提供签名!

为了使这对 flows 在同一个等级范围,我们通过强制用户将这个 flow 嵌套到他们自己的父 flows 中的方式来允许用户决定他们是否要为这个 transaction 提供签名。

总体上来说,如果你在写一个 flow 的话,你是否应该将其定义为一个 initiating flow 应该基于 counter-flow 是否需要更广泛的上下文来达到它的目标。

FlowException

假设一个节点在运行 flow 的时候抛出了一个异常。其他任何在等待该节点返回信息的节点(比如作为调用 receive 或者 sendAndReceive 的一部分)会被提示该 flow 异常终止并且自我结束。然而抛出的异常不会被发回到 counterparties。

如果你想告知任何等待的 counterparties 异常的原因的话,你可以通过抛出一个 FlowException 来实现:

/**
 * Exception which can be thrown by a [FlowLogic] at any point in its logic to unexpectedly bring it to a permanent end.
 * The exception will propagate to all counterparty flows and will be thrown on their end the next time they wait on a
 * [FlowSession.receive] or [FlowSession.sendAndReceive]. Any flow which no longer needs to do a receive, or has already ended,
 * will not receive the exception (if this is required then have them wait for a confirmation message).
 *
 * [FlowException] (or a subclass) can be a valid expected response from a flow, particularly ones which act as a service.
 * It is recommended a [FlowLogic] document the [FlowException] types it can throw.
 */
open class FlowException(message: String?, cause: Throwable?) : CordaException(message, cause) {
    constructor(message: String?) : this(message, null)
    constructor(cause: Throwable?) : this(cause?.toString(), cause)
    constructor() : this(null, null)
}

Flow framework 会自动地将这个 FlowException 返回给等待的 counterparties。

以下的情况是适合返回一个 FlowException 的:

  • 没有 verify() 方法的 transaction
  • 一个 transaction 的签名是无效的
  • Transaction 跟讨论的交易参数不匹配
  • 交易违规

ProgressTracker

我们可以给我们的 flow 一个进度跟踪器(progress tracker)。这个使我们能够在我们节点的 CRaSH shell 中看到 flow 的进展。

为了提供一个 progress tracker,我们需要在我们的 flow 中重写 FlowLogic.progressTracker

companion object {
    object ID_OTHER_NODES : Step("Identifying other nodes on the network.")
    object SENDING_AND_RECEIVING_DATA : Step("Sending data between parties.")
    object EXTRACTING_VAULT_STATES : Step("Extracting states from the vault.")
    object OTHER_TX_COMPONENTS : Step("Gathering a transaction's other components.")
    object TX_BUILDING : Step("Building a transaction.")
    object TX_SIGNING : Step("Signing a transaction.")
    object TX_VERIFICATION : Step("Verifying a transaction.")
    object SIGS_GATHERING : Step("Gathering a transaction's signatures.") {
        // Wiring up a child progress tracker allows us to see the
        // subflow's progress steps in our flow's progress tracker.
        override fun childProgressTracker() = CollectSignaturesFlow.tracker()
    }

    object VERIFYING_SIGS : Step("Verifying a transaction's signatures.")
    object FINALISATION : Step("Finalising a transaction.") {
        override fun childProgressTracker() = FinalityFlow.tracker()
    }

    fun tracker() = ProgressTracker(
            ID_OTHER_NODES,
            SENDING_AND_RECEIVING_DATA,
            EXTRACTING_VAULT_STATES,
            OTHER_TX_COMPONENTS,
            TX_BUILDING,
            TX_SIGNING,
            TX_VERIFICATION,
            SIGS_GATHERING,
            VERIFYING_SIGS,
            FINALISATION
    )
}

然后我们就可以按照下边的方式来根据 flow 的进展来更新 progress tracker 的当前步骤:

progressTracker.currentStep = ID_OTHER_NODES

发表评论

电子邮件地址不会被公开。 必填项已用*标注