Adding events to a River Stream
Events can be added to a River Stream Node over rpc using the service interface exposed by the protocol in protocol.proto
. The following guide will illustrate a concrete example of making an addEvent
request.
Requirements
- protobuf@3
- bufbuild/buf/buf
Dependencies
To run the below recipe, you’ll want to create a Typescript project with a package.json
file with the below dependencies. The river dependencies require building harmony-web
(see README.md
)
- ethers
- @river/waterproof
- @river/sdk
The following recipe assumes a client rpc service generated from protocol.proto and that the user’s wallet is a valid wallet for adding an event to its own UserStream
.
Adding an event to UserStream
Users that have initialized their UserStream can add events to those streams. Adding other message
events to a different stream follows from this guide given all events are added using the addEvent
RPC method implemented by the node’s StreamService
.
Create a typescript file and follow the below steps to add an event. You will need to set RIVER_STREAM_SERVER_URL
as well, which you can set to the test node at https://node1-test.towns.com
.
ES Module syntax is assumed in below example code.
Import package dependencies
import { ethers } from 'ethers'
import { makeOldTownsDelegateSig, bin_fromHexString } from '@river/waterproof'
import {
ParsedStreamResponse,
SignerContext,
StreamRpcClient,
makeEvent,
makeStreamRpcClient,
makeUserStreamId,
unpackStream,
userIdFromAddress,
make_UserPayload_Inception,
} from '@river/sdk'
console.log('adding-events.ts')
Create a new random delegate wallet and signer context
async function createSignerContext(): Promise<SignerContext> {
const primaryWallet = ethers.Wallet.createRandom()
const delegateWallet = ethers.Wallet.createRandom()
const creatorAddress = bin_fromHexString(await primaryWallet.getAddress())
const delegateSig = await makeOldTownsDelegateSig(primaryWallet, delegateWallet.publicKey)
const pk = delegateWallet.privateKey.slice(2)
const context: SignerContext = {
signerPrivateKey: () => pk,
creatorAddress,
delegateSig,
}
return context
}
Create client instance of river StreamService
function createRpcClient(serverUrl: string): StreamRpcClient {
const rpcClient = makeStreamRpcClient(serverUrl)
return rpcClient
}
Initialize UserStream with wallet
async function initializeUserStream(rpcClient: StreamRpcClient, context: SignerContext) {
try {
const userId = userIdFromAddress(context.creatorAddress)
const userStreamId = makeUserStreamId(userId)
const createUserStream = async (userStreamId: string): Promise<ParsedStreamResponse> => {
const userEvents = [
await makeEvent(
context,
make_UserPayload_Inception({
streamId: userStreamId,
}),
),
]
const response = await rpcClient.createStream({
events: userEvents,
streamId: userStreamId,
})
return unpackStream(response.stream)
}
const userStream = await createUserStream(userStreamId)
console.log(`created user stream ${userStreamId}`)
} catch (e) {
console.log('error creating user stream')
throw e
}
}
In the above function, createUserStream
adds an inception event to the user stream associated with the user’s streamId, which is derived from the user’s userId.
All streams are created with inception event that marks the genesis of the stream.
Helper script to run all above functions to addEvent to new UserStream
async function main() {
const context = await createSignerContext()
const rpcClient = createRpcClient(RIVER_STREAM_SERVER_URL)
await initializeUserStream(rpcClient, context)
}
main()