API: Vault Query

Corda 从最底层的架构开始一直都在推崇使用业界标准的,经过考验的查询框架和类库来访问存储 transaction 数据的 RDBMS 后台(包括 Vault)。

Corda 提供了一系列的灵活的查询机制来访问 Vault:

  • Vault Query API
  • 使用 JDBC session
  • 自定义 JPA/JPQL 查询
  • 自定义第三方的数据访问框架,比如 Spring Data

大多数的查询需求都能够通过使用 Vault Query API 来满足,该 API 是通过 VaultService 暴露出来的,可以被 flow 直接使用:

/**
 * Generic vault query function which takes a [QueryCriteria] object to define filters,
 * optional [PageSpecification] and optional [Sort] modification criteria (default unsorted),
 * and returns a [Vault.Page] object containing the following:
 *  1. states as a List of <StateAndRef> (page number and size defined by [PageSpecification])
 *  2. states metadata as a List of [Vault.StateMetadata] held in the Vault States table.
 *  3. total number of results available if [PageSpecification] supplied (otherwise returns -1)
 *  4. status types used in this query: UNCONSUMED, CONSUMED, ALL
 *  5. other results (aggregate functions with/without using value groups)
 *
 * @throws VaultQueryException if the query cannot be executed for any reason
 *        (missing criteria or parsing error, paging errors, unsupported query, underlying database error)
 *
 * Notes
 *   If no [PageSpecification] is provided, a maximum of [DEFAULT_PAGE_SIZE] results will be returned.
 *   API users must specify a [PageSpecification] if they are expecting more than [DEFAULT_PAGE_SIZE] results,
 *   otherwise a [VaultQueryException] will be thrown alerting to this condition.
 *   It is the responsibility of the API user to request further pages and/or specify a more suitable [PageSpecification].
 */
@Throws(VaultQueryException::class)
fun <T : ContractState> _queryBy(criteria: QueryCriteria,
                                 paging: PageSpecification,
                                 sorting: Sort,
                                 contractStateType: Class<out T>): Vault.Page<T>

/**
 * Generic vault query function which takes a [QueryCriteria] object to define filters,
 * optional [PageSpecification] and optional [Sort] modification criteria (default unsorted),
 * and returns a [Vault.PageAndUpdates] object containing
 * 1) a snapshot as a [Vault.Page] (described previously in [queryBy])
 * 2) an [Observable] of [Vault.Update]
 *
 * @throws VaultQueryException if the query cannot be executed for any reason
 *
 * Notes: the snapshot part of the query adheres to the same behaviour as the [queryBy] function.
 *        the [QueryCriteria] applies to both snapshot and deltas (streaming updates).
 */
@Throws(VaultQueryException::class)
fun <T : ContractState> _trackBy(criteria: QueryCriteria,
                                 paging: PageSpecification,
                                 sorting: Sort,
                                 contractStateType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>>

并且 RPC client applications 可以通过 CordaRPCOps 来使用:

@RPCReturnsObservables
fun <T : ContractState> vaultQueryBy(criteria: QueryCriteria,
                                     paging: PageSpecification,
                                     sorting: Sort,
                                     contractStateType: Class<out T>): Vault.Page<T>
@RPCReturnsObservables
fun <T : ContractState> vaultTrackBy(criteria: QueryCriteria,
                                     paging: PageSpecification,
                                     sorting: Sort,
                                     contractStateType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>>

Helper 方法也被提供,包括参数的默认值:

fun <T : ContractState> vaultQuery(contractStateType: Class<out T>): Vault.Page<T> {
    return vaultQueryBy(QueryCriteria.VaultQueryCriteria(), PageSpecification(), Sort(emptySet()), contractStateType)
}
fun <T : ContractState> vaultQueryByCriteria(criteria: QueryCriteria, contractStateType: Class<out T>): Vault.Page<T> {
    return vaultQueryBy(criteria, PageSpecification(), Sort(emptySet()), contractStateType)
}
fun <T : ContractState> vaultQueryByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification): Vault.Page<T> {
    return vaultQueryBy(criteria, paging, Sort(emptySet()), contractStateType)
}
fun <T : ContractState> vaultQueryByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort): Vault.Page<T> {
    return vaultQueryBy(criteria, PageSpecification(), sorting, contractStateType)
}
fun <T : ContractState> vaultTrack(contractStateType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>> {
    return vaultTrackBy(QueryCriteria.VaultQueryCriteria(), PageSpecification(), Sort(emptySet()), contractStateType)
}
fun <T : ContractState> vaultTrackByCriteria(contractStateType: Class<out T>, criteria: QueryCriteria): DataFeed<Vault.Page<T>, Vault.Update<T>> {
    return vaultTrackBy(criteria, PageSpecification(), Sort(emptySet()), contractStateType)
}
fun <T : ContractState> vaultTrackByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification): DataFeed<Vault.Page<T>, Vault.Update<T>> {
    return vaultTrackBy(criteria, paging, Sort(emptySet()), contractStateType)
}
fun <T : ContractState> vaultTrackByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort): DataFeed<Vault.Page<T>, Vault.Update<T>> {
    return vaultTrackBy(criteria, PageSpecification(), sorting, contractStateType)
}

API 提供了静态的(snapshot)和动态的(包含 steaming updates 的 snapshot)方法来定义一系列的过滤条件。

  • 使用 queryBy 来获得数据当前的 snapshot(对于一个给定的 QueryCriteria
  • 使用 trackBy 来获得既包括当前的 snapshot 也包括将来的更新流(对于一个给定的 QueryCriteria

注意:更新流只能够基于 contract type 和 state status(UNCONSUMED, CONSUMED, ALL)来进行过滤。

也可以指定简单的分页(页码和每页包含记录数)和排序(使用标准或者自定义的属性值进行排序)。分页的默认值为(pageNumber = 0(显示第一页), pageSize = 200(每页200条记录)),排序的默认值为(direction = ASC(升序))。

QueryCriteria 接口提供了灵活的机制来指定不同的过滤条件,包括 and/or 组合和一系列的操作符,包括:

  • Binary logical (AND, OR)
  • Comparison (LESS_THAN, LESS_THAN_OR_EQUAL, GREATER_THAN, GREATER_THAN_OR_EQUAL)
  • Equality (EQUAL, NOT_EQUAL)
  • Likeness (LIKE, NOT_LIKE)
  • Nullability (IS_NULL, NOT_NULL)
  • Collection based (IN, NOT_IN)
  • Standard SQL-92 aggregate functions (SUM, AVG, MIN, MAX, COUNT)

这里有四种对于该接口的实现,可以结合使用他们来定义高级的过滤条件。

  1. VaultQueryCriteria 提供了针对于 Vault states 表的属性值的可过滤条件:status (UNCONSUMED, CONSUMED), state reference(s), contract state type(s), notaries, soft locked states, timestamps (RECORDED, CONSUMED)。
    注意:对于经常使用的属性,已经定义了一些显著的默认值(status = UNCONSUMED, includeSoftlockedStates = true)
  2. FungibleAssetQueryCriteria 提供了针对于在 Corda 核心的 FungibleAsset contract state 接口中定义的属性的可过滤条件,用来展示资产是可替换的,可计数的并且被指定的机构发行(比如在 Corda finance module 中的 Cash.StateCommondityContract.State)。可过滤条件包括:participants(s), owner(s), quantity, issuer party(s) and issuer reference(s)。
    注意:所有扩展了 FungibleAsset 的 contract states 会自动将该接口中的常规 state 属性存储到 vault_fungible_states 表中。
  3.  LinearStateQueryCriteria 提供了针对于在 Corda 核心的 LinearStateDealState 中定义的属性的可过滤条件,用来展示那些一直在取代/替换自己的实体,所有的实体都包含一个相同的 linearId(比如像 SIMM valuation demo 中的 trade 实体 states)。可过滤的条件包括:participant(s), linearId(s), dealRef(s)。
    注意:所有扩展自 LinearState 或者 DealState 的 contract states 会自动地将该接口中常用的 state 属性存储到 vault_linear_states 表中。
  4. VaultCustomQeuryCriteria 提供了一种方式,来指定一个或者多个基于自定义的 contract state  的属性的任意表达式,这个自定义的 contract state 实现了像 api-persistence 文档中描述的自己的 schema。自定义的条件表达式通过使用一个或者多个 type-safe 的 CriteriaExpression:BinaryLogical, Not, ColumnPredicateExpressionColumnPredicateExpression 允许使用前边罗列的操作符类型来指定任意的条件。更进一步地讲,一个丰富的 DSL 被提供来使得用任何的 ColumnPredicate 组合来简单地构建一个自定义的查询条件成为可能。

注意:自定义的 contract schemas 会在节点启动的时候被自动注册。可以参考 Persistence 来了解对于不同的测试目的自定义 schemas 注册的机制。

所有的 QueryCriteria 实现都可以使用 andor 操作符。
所有的 QueryCriteria 实现提供了一套可显式指定的常用属性:

  1. State 状态属性(Vault.StateStatus),默认值是 UNCONSUMED states。当使用 AND/OR 定义多个条件的时候,这个属性的最后一个值会覆盖之前的所有值
  2. Contract state 类型(<Set<Class<out ContractState>>),它至少包含一个类型(默认的会是满足所有 state 类型的 ContractState)。当使用 AND/OR 定义多个条件的时候,所有指定的 contract state types 会被合并为一套

下边是一个自定义查询的演示实例:

val generalCriteria = VaultQueryCriteria(Vault.StateStatus.ALL)

val results = builder {
    val currencyIndex = PersistentCashState::currency.equal(USD.currencyCode)
    val quantityIndex = PersistentCashState::pennies.greaterThanOrEqual(10L)

    val customCriteria1 = VaultCustomQueryCriteria(currencyIndex)
    val customCriteria2 = VaultCustomQueryCriteria(quantityIndex)

    val criteria = generalCriteria.and(customCriteria1.and(customCriteria2))
    vaultService.queryBy<Cash.State>(criteria)
}

注意:自定义的实现了 Queryable 接口的 contract states 现在可能扩展了常规的 schemas types FungiblePersistentState 或者 LinearPersistentState。以前,所有的自定义 contracts 扩展了根 PersistentState 类并且定义了对于 FungibleAssetLinearState 属性的重复 mappings。参考例子 SampleCashSchemaV2DummyLinearStateSchemaV2

下边会有这些 QueryCriteria 对象的使用例子。

注意:当在 Kotlin 中将指定的 ContractType 作为传给 QueryCriteria 的参数类型的时候,如果这是一个 interface 的话,那么查询会包括所有的 concrete 实现。在以前,只能够基于 concrete 类型进行查询(或者所有的 ContractState

Vault 查询 API 使用了底层 JPA Hibernate 的丰富的语义,JPA Hibernate 是基于 Corda 所采用的基于持久化的框架的。

注意:对于数据库 level 的权限控制会在今后被强制执行,确保经过验证的,基于角色的和只读的访问权限来控制 Corda 的表。

注意:现在 API 提供了非常简单的方式通过 Java 和 Kotlin 来使用 semantics。然而,我们应该注意到 Java 的 查询更加的繁琐因为使用了反射字段(reflection fields)来引用 schema 属性类型。

在 Java 中创建一个自定义查询的例子:

QueryCriteria generalCriteria = new VaultQueryCriteria(Vault.StateStatus.ALL);

Field attributeCurrency = CashSchemaV1.PersistentCashState.class.getDeclaredField("currency");
Field attributeQuantity = CashSchemaV1.PersistentCashState.class.getDeclaredField("pennies");

CriteriaExpression currencyIndex = Builder.equal(attributeCurrency, "USD");
CriteriaExpression quantityIndex = Builder.greaterThanOrEqual(attributeQuantity, 10L);

QueryCriteria customCriteria2 = new VaultCustomQueryCriteria(quantityIndex);
QueryCriteria customCriteria1 = new VaultCustomQueryCriteria(currencyIndex);


QueryCriteria criteria = generalCriteria.and(customCriteria1).and(customCriteria2);
Vault.Page<ContractState> results = vaultService.queryBy(Cash.State.class, criteria);

注意:当前这个根据 Party 的查询指定了 AbstractParty,该 AbstractParty 可能是具体的或者是匿名的。如果是匿名的,当一个匿名的参与方没有一个指定的 X500Name 的话,将不会返回任何结果。基于效率的原因,查询不会使用 PublicKey 作为查询条件。

分页

API 提供了分页的支持来应对可能返回大批量数据的情况(默认会返回200条记录)。定义一个合理的分页数量的默认值能够让 flows 中的 checkpointing 更有效率,并且开发人员不用再去担心当结果集包含200条或者更少的记录的时候要怎么去进行分页。当大批量的结果可能返回的时候(比如使用 RPC API 做报表并且要现在 UI 上的情况),我们强烈地建议定义一个 PageSpecification 来有效使用内存来正确的执行查询并获得结果。当返回结果超过 200 条记录但是 PageSpecification 没有被指定的时候,这里已经存在一个 fail-fast 模型会来提醒 API 用户需要去进行分页。

下边的查询会从账本中查询所有 unconsumed ContractState, 每页会包含 200 条记录,返回的是默认的页数(第一页):

val vaultSnapshot = proxy.vaultQueryBy<ContractState>(
    QueryCriteria.VaultQueryCriteria(Vault.StateStatus.UNCONSUMED),
    PageSpecification(DEFAULT_PAGE_NUM, 200))

注意:每页显示多少条记录会通过 Int.MAX_VALUE 来定义,并且你需要很小心的定义它,因为返回的结果可能会超出你的 JVM 的内存 footprint。

使用实例

Kotlin

使用 VaultQueryCriteria 的常规 snapshot 查询:

查询所有的 unconsumed states(可能是最简单的一个查询):

val result = vaultService.queryBy<ContractState>()

/**
 * Query result returns a [Vault.Page] which contains:
 *  1) actual states as a list of [StateAndRef]
 *  2) state reference and associated vault metadata as a list of [Vault.StateMetadata]
 *  3) [PageSpecification] used to delimit the size of items returned in the result set (defaults to [DEFAULT_PAGE_SIZE])
 *  4) Total number of items available (to aid further pagination if required)
 */
val states = result.states
val metadata = result.statesMetadata

对于一些 state 引用的 unconsumed states 查询:

val sortAttribute = SortAttribute.Standard(Sort.CommonStateAttribute.STATE_REF_TXN_ID)
val criteria = VaultQueryCriteria(stateRefs = listOf(stateRefs.first(), stateRefs.last()))
val results = vaultService.queryBy<DummyLinearContract.State>(criteria, Sort(setOf(Sort.SortColumn(sortAttribute, Sort.Direction.ASC))))

对于一些 contract state 类型的 unconsumed states 查询:

val criteria = VaultQueryCriteria(contractStateTypes = setOf(Cash.State::class.java, DealState::class.java))
val results = vaultService.queryBy<ContractState>(criteria)

对于指定的一个 notary 的 unconsumed states 查询:

val criteria = VaultQueryCriteria(notary = listOf(CASH_NOTARY))
val results = vaultService.queryBy<ContractState>(criteria)

对于指定的一系列 participants 的 unconsumed states 的查询:

val criteria = LinearStateQueryCriteria(participants = listOf(BIG_CORP, MINI_CORP))
val results = vaultService.queryBy<ContractState>(criteria)

对于指定时间区间内的 unconsumed states 记录的查询

val start = TODAY
val end = TODAY.plus(30, ChronoUnit.DAYS)
val recordedBetweenExpression = TimeCondition(
        QueryCriteria.TimeInstantType.RECORDED,
        ColumnPredicate.Between(start, end))
val criteria = VaultQueryCriteria(timeCondition = recordedBetweenExpression)
val results = vaultService.queryBy<ContractState>(criteria)

注意:上边的例子演示了如何使用 Between ColumnPredicate
查询所有的 states 并且使用指定的分页条件(每页 10 条记录)

val pagingSpec = PageSpecification(DEFAULT_PAGE_NUM, 10)
val criteria = VaultQueryCriteria(status = Vault.StateStatus.ALL)
val results = vaultService.queryBy<ContractState>(criteria, paging = pagingSpec)

注意:结果集的 totalStatesAvailable metadata 字段允许你可以像下边的例子那样进行进一步的分页。

使用 pagination specification 和 iterate,使用 totalStatesAvailable 字段查询所有的 states 直到最后一页:

var pageNumber = DEFAULT_PAGE_NUM
val states = mutableListOf<StateAndRef<ScheduledState>>()
do {
    val pageSpec = PageSpecification(pageSize = PAGE_SIZE, pageNumber = pageNumber)
    val results = vaultService.queryBy<ScheduledState>(VaultQueryCriteria(), pageSpec, SORTING)
    states.addAll(results.states)
    pageNumber++
} while ((pageSpec.pageSize * (pageNumber)) <= results.totalStatesAvailable)

LinearState 和 DealState 查询应该使用 LinearStateQueryCriteria
对于指定的 linear ids 的 unconsumed linear states 的查询:

val linearIds = issuedStates.states.map { it.state.data.linearId }.toList()
val criteria = LinearStateQueryCriteria(linearId = listOf(linearIds.first(), linearIds.last()))
val results = vaultService.queryBy<LinearState>(criteria)

对于指定的 linear id 的所有 linear states 的查询:

val linearStateCriteria = LinearStateQueryCriteria(linearId = listOf(linearId), status = Vault.StateStatus.ALL)
val vaultCriteria = VaultQueryCriteria(status = Vault.StateStatus.ALL)
val results = vaultService.queryBy<LinearState>(linearStateCriteria and vaultCriteria)

对于指定的 deal references 的 unconsumed deal states 的查询:

val criteria = LinearStateQueryCriteria(externalId = listOf("456", "789"))
val results = vaultService.queryBy<DealState>(criteria)

对于指定的 deals parties 的 unconsumed deal states 的查询:

val criteria = LinearStateQueryCriteria(participants = parties)
val results = vaultService.queryBy<DealState>(criteria)

FungibleAsset 和 DealAsset 查询应该使用 FungibleAssetQueryCriteria
对于指定的 currency 的 fungible assets 的查询:

val ccyIndex = builder { CashSchemaV1.PersistentCashState::currency.equal(USD.currencyCode) }
val criteria = VaultCustomQueryCriteria(ccyIndex)
val results = vaultService.queryBy<FungibleAsset<*>>(criteria)

对于最小数量的 fungible assets 的查询:

val fungibleAssetCriteria = FungibleAssetQueryCriteria(quantity = builder { greaterThan(2500L) })
val results = vaultService.queryBy<Cash.State>(fungibleAssetCriteria)

对于指定的 issuer party 的 fungible assets 的查询:

val criteria = FungibleAssetQueryCriteria(issuer = listOf(BOC))
val results = vaultService.queryBy<FungibleAsset<*>>(criteria)

使用 VaultCustomQueryCriteria 来实现聚合查询:
注意:聚合查询的查询结果会被包含在结果页的 otherResults 属性中。

使用多种方法对 cash 进行聚合查询:

val sum = builder { CashSchemaV1.PersistentCashState::pennies.sum() }
val sumCriteria = VaultCustomQueryCriteria(sum)

val count = builder { CashSchemaV1.PersistentCashState::pennies.count() }
val countCriteria = VaultCustomQueryCriteria(count)

val max = builder { CashSchemaV1.PersistentCashState::pennies.max() }
val maxCriteria = VaultCustomQueryCriteria(max)

val min = builder { CashSchemaV1.PersistentCashState::pennies.min() }
val minCriteria = VaultCustomQueryCriteria(min)

val avg = builder { CashSchemaV1.PersistentCashState::pennies.avg() }
val avgCriteria = VaultCustomQueryCriteria(avg)

val results = vaultService.queryBy<FungibleAsset<*>>(sumCriteria
        .and(countCriteria)
        .and(maxCriteria)
        .and(minCriteria)
        .and(avgCriteria))

注意:otherResults 会包含5个 items,每个聚合方法一个。

使用多种方法按照 currency 进行分组对 cash 进行聚合查询:

val sum = builder { CashSchemaV1.PersistentCashState::pennies.sum(groupByColumns = listOf(CashSchemaV1.PersistentCashState::currency)) }
val sumCriteria = VaultCustomQueryCriteria(sum)

val max = builder { CashSchemaV1.PersistentCashState::pennies.max(groupByColumns = listOf(CashSchemaV1.PersistentCashState::currency)) }
val maxCriteria = VaultCustomQueryCriteria(max)

val min = builder { CashSchemaV1.PersistentCashState::pennies.min(groupByColumns = listOf(CashSchemaV1.PersistentCashState::currency)) }
val minCriteria = VaultCustomQueryCriteria(min)

val avg = builder { CashSchemaV1.PersistentCashState::pennies.avg(groupByColumns = listOf(CashSchemaV1.PersistentCashState::currency)) }
val avgCriteria = VaultCustomQueryCriteria(avg)

val results = vaultService.queryBy<FungibleAsset<*>>(sumCriteria
        .and(maxCriteria)
        .and(minCriteria)
        .and(avgCriteria))

注意:otherResults 将会包含 24 个 items,每个 currency 的每个局和方法会有一个结果(这里的分组属性 currency 会和每一个聚合结果返回回来)。

对 cash 按照 issuer party 进行分组并按照 sum 排序的 sum 聚合查询:

val sum = builder {
    CashSchemaV1.PersistentCashState::pennies.sum(groupByColumns = listOf(CashSchemaV1.PersistentCashState::issuerParty,
            CashSchemaV1.PersistentCashState::currency),
            orderBy = Sort.Direction.DESC)
}

val results = vaultService.queryBy<FungibleAsset<*>>(VaultCustomQueryCriteria(sum))

注意:otherResults 会包含从加和最大到最小的 cash 数量共 12个 items,每个 issuer party 和 currency 的聚合方法会有一个结果。

动态查询(Dynamic queries)(也包括 VaultQueryCriteria)是对于 snapshot 查询的一个扩展,其返回了一个额外的 QueryResults 返回类型,作为一个 Observable<Vault.Update>形式返回。参考 ReactiveX Observable 来了解详细内容和理解怎么使用这种类型。

跟踪 unconsumed cash states:

vaultService.trackBy<Cash.State>().updates     // UNCONSUMED default

跟踪 unconsumed linear states:

val (snapshot, updates) = vaultService.trackBy<LinearState>()

注意:这会返回 DealLinear states
跟踪 unconsumed deal states:

val (snapshot, updates) = vaultService.trackBy<DealState>()

注意:这个会只返回 Deal states

Java 实例请参考 https://docs.corda.net/api-vault-query.html#java-examples

其他 use case scenarios

对于一些要求使用复杂的分页,排序,分组和聚合功能的高级使用场景,建议开发者使用其他的好的提供这样的功能的框架。即对于 JPQL(JPA Query Language) 的实现,比如对 advanced SQL access 的 Hibernate,对于高级分页和排序结构的 Spring Data。

Corda Tutorials 提供了满足一下要求的例子:

  • Template/Tutorial CorDapp service 使用 Vault API Custom Query 来访问 IOU state 的属性
  • Template/Tutorial CorDapp service 查询扩展通过使用 JPQL 来执行名字查询(Named Queries)
  • 使用 Spring Data JPA 进行高级分页查询

发表评论

电子邮件地址不会被公开。 必填项已用*标注