Adding Events to River Stream
Adding events to a River Stream
Events can be added to a River Stream Node over rpc using the service interface exposed by the protocol in protocol.proto
. The following guide will illustrate a concrete example of making an addEvent
request.
Requirements
- protobuf@3
- bufbuild/buf/buf
Dependencies
To run the below recipe, you’ll want to create a Typescript project with a package.json
file with the below dependencies. The river dependencies require building harmony-web
(see README.md
)
- ethers
- @river/waterproof
- @river/sdk
UserStream
.Adding an event to UserStream
Users that have initialized their UserStream can add events to those streams. Adding other message
events to a different stream follows from this guide given all events are added using the addEvent
RPC method implemented by the node’s StreamService
.
Create a typescript file and follow the below steps to add an event. You will need to set RIVER_STREAM_SERVER_URL
as well, which you can set to the test node at https://node1-test.towns.com
.
Import package dependencies
import { ethers } from 'ethers'
import { makeOldTownsDelegateSig, bin_fromHexString } from '@river/waterproof'
import {
ParsedStreamResponse,
SignerContext,
StreamRpcClient,
makeEvent,
makeStreamRpcClient,
makeUserStreamId,
unpackStream,
userIdFromAddress,
make_UserPayload_Inception,
} from '@river/sdk'
console.log('adding-events.ts')
Create a new random delegate wallet and signer context
async function createSignerContext(): Promise<SignerContext> {
// primary wallet can be a random new wallet
// if we are just registering a new wallet
// and not performing any on-chain transactions.
const primaryWallet = ethers.Wallet.createRandom()
const delegateWallet = ethers.Wallet.createRandom()
const creatorAddress = bin_fromHexString(await primaryWallet.getAddress())
const delegateSig = await makeOldTownsDelegateSig(primaryWallet, delegateWallet.publicKey)
const pk = delegateWallet.privateKey.slice(2)
const context: SignerContext = {
signerPrivateKey: () => pk,
creatorAddress,
delegateSig,
}
return context
}
Create client instance of river StreamService
function createRpcClient(serverUrl: string): StreamRpcClient {
const rpcClient = makeStreamRpcClient(serverUrl)
return rpcClient
}
Initialize UserStream with wallet
async function initializeUserStream(rpcClient: StreamRpcClient, context: SignerContext) {
try {
const userId = userIdFromAddress(context.creatorAddress)
const userStreamId = makeUserStreamId(userId)
const createUserStream = async (userStreamId: string): Promise<ParsedStreamResponse> => {
const userEvents = [
await makeEvent(
context,
make_UserPayload_Inception({
streamId: userStreamId,
}),
),
]
const response = await rpcClient.createStream({
events: userEvents,
streamId: userStreamId,
})
return unpackStream(response.stream)
}
const userStream = await createUserStream(userStreamId)
console.log(`created user stream ${userStreamId}`)
} catch (e) {
console.log('error creating user stream')
throw e
}
}
In the above function, createUserStream
adds an inception event to the user stream associated with the user’s streamId, which is derived from the user’s userId.
Helper script to run all above functions to addEvent to new UserStream
async function main() {
const context = await createSignerContext()
const rpcClient = createRpcClient(RIVER_STREAM_SERVER_URL)
await initializeUserStream(rpcClient, context)
}
main()
Was this page helpful?