Adding events to a River Stream

Events can be added to a River Stream Node over rpc using the service interface exposed by the protocol in protocol.proto. The following guide will illustrate a concrete example of making an addEvent request.

Requirements

  • protobuf@3
  • bufbuild/buf/buf

Dependencies

To run the below recipe, you’ll want to create a Typescript project with a package.json file with the below dependencies. The river dependencies require building harmony-web (see README.md)

  • ethers
  • @river/waterproof
  • @river/sdk
The following recipe assumes a client rpc service generated from protocol.proto and that the user’s wallet is a valid wallet for adding an event to its own UserStream.

Adding an event to UserStream

Users that have initialized their UserStream can add events to those streams. Adding other message events to a different stream follows from this guide given all events are added using the addEvent RPC method implemented by the node’s StreamService.

Create a typescript file and follow the below steps to add an event. You will need to set RIVER_STREAM_SERVER_URL as well, which you can set to the test node at https://node1-test.towns.com.

ES Module syntax is assumed in below example code.
1

Import package dependencies

import { ethers } from 'ethers'
import { makeOldTownsDelegateSig, bin_fromHexString } from '@river/waterproof'
import {
    ParsedStreamResponse,
    SignerContext,
    StreamRpcClient,
    makeEvent,
    makeStreamRpcClient,
    makeUserStreamId,
    unpackStream,
    userIdFromAddress,
    make_UserPayload_Inception,
} from '@river/sdk'

console.log('adding-events.ts')
2

Create a new random delegate wallet and signer context

async function createSignerContext(): Promise<SignerContext> {
    // primary wallet can be a random new wallet
    // if we are just registering a new wallet
    // and not performing any on-chain transactions.
    const primaryWallet = ethers.Wallet.createRandom()
    const delegateWallet = ethers.Wallet.createRandom()
    const creatorAddress = bin_fromHexString(await primaryWallet.getAddress())
    const delegateSig = await makeOldTownsDelegateSig(primaryWallet, delegateWallet.publicKey)
    const pk = delegateWallet.privateKey.slice(2)
    const context: SignerContext = {
        signerPrivateKey: () => pk,
        creatorAddress,
        delegateSig,
    }
    return context
}
3

Create client instance of river StreamService

function createRpcClient(serverUrl: string): StreamRpcClient {
  const rpcClient = makeStreamRpcClient(serverUrl)
  return rpcClient
}
4

Initialize UserStream with wallet

async function initializeUserStream(rpcClient: StreamRpcClient, context: SignerContext) {
    try {
        const userId = userIdFromAddress(context.creatorAddress)
        const userStreamId = makeUserStreamId(userId)

        const createUserStream = async (userStreamId: string): Promise<ParsedStreamResponse> => {
            const userEvents = [
                await makeEvent(
                    context,
                    make_UserPayload_Inception({
                        streamId: userStreamId,
                    }),
                ),
            ]
            const response = await rpcClient.createStream({
                events: userEvents,
                streamId: userStreamId,
            })
            return unpackStream(response.stream)
        }
        const userStream = await createUserStream(userStreamId)
        console.log(`created user stream ${userStreamId}`)
    } catch (e) {
        console.log('error creating user stream')
        throw e
    }
}

In the above function, createUserStream adds an inception event to the user stream associated with the user’s streamId, which is derived from the user’s userId.

All streams are created with inception event that marks the genesis of the stream.

5

Helper script to run all above functions to addEvent to new UserStream

  async function main() {
  const context = await createSignerContext()
  const rpcClient = createRpcClient(RIVER_STREAM_SERVER_URL)
  await initializeUserStream(rpcClient, context)
}

main()