Streams api for events with self hosted server

No the Moralis stream is already listening to all the contract events (see snip below).
I am talking about the new tables activeItem, itemBought, itemCanceled, and itemListed that are
generated from the cloud functions listed here.

that I have moved into the parse server file:

Note that these customized tables typically set key fields like buyer, nftAddress, tokenId, price etc… as shown below:

[quote=“Tristan3000, post:13, topic:20758”]

itemBought.set('buyer', buyer);
      itemBought.set('nftAddress', nftAddress);
      itemBought.set('tokenId', tokenId);
      itemBought.set('price', ethers.utils.formatUnits(price, 6));
      itemBought.save();

[/quote]

So you are using streams api here?

In this case you use the abi to decode the event I order to extract the fields and then you insert the data in mongondb.

At what step you are with those events from streams api?

There is also be a plugin for parse server and streams api.

streams API will work only with a chain that is supported, it will not work with a local devchain (just to clarify it)

here is more info about the plugin for parse server

Yep using stream API of course and it works fine thanks to the Moralis plugin and tutorial.
Yes indeed decoding the events is what is being done with the cloud functions listed above.
The whole question now is how to make these cloud functions work from Moralis v1 to Moralis v2?

1 Like

on what cloud functions you are referring?

These ones in updateActiveItems.js

maybe you can get more specific? like a simple example

So if you go back to the code pasted higher in the thread in updateActiveItems.js you can see the cloud functions, here is an example pasted below for “ItemListed” and “activeItem” tables.
Can you please advise what elements are missing in the stack to get these cloud functions to work with Moralis v2.

Moralis.Cloud.afterSave("ItemListed", async (request) => {
    const confirmed = request.object.get("confirmed")
    const logger = Moralis.Cloud.getLogger()
    logger.info("Looking for confirmed TX...")
    if (confirmed) {
        logger.info("Found item!")
        const ActiveItem = Moralis.Object.extend("ActiveItem")

        // In case of listing update, search for already listed ActiveItem and delete
        const query = new Moralis.Query(ActiveItem)
        query.equalTo("nftAddress", request.object.get("nftAddress"))
        query.equalTo("tokenId", request.object.get("tokenId"))
        query.equalTo("marketplaceAddress", request.object.get("address"))
        query.equalTo("seller", request.object.get("seller"))
        logger.info(`Marketplace | Query: ${query}`)
        const alreadyListedItem = await query.first()
        console.log(`alreadyListedItem ${JSON.stringify(alreadyListedItem)}`)
        if (alreadyListedItem) {
            logger.info(`Deleting ${alreadyListedItem.id}`)
            await alreadyListedItem.destroy()
            logger.info(
                `Deleted item with tokenId ${request.object.get(
                    "tokenId"
                )} at address ${request.object.get(
                    "address"
                )} since the listing is being updated. `
            )
        }

        // Add new ActiveItem
        const activeItem = new ActiveItem()
        activeItem.set("marketplaceAddress", request.object.get("address"))
        activeItem.set("nftAddress", request.object.get("nftAddress"))
        activeItem.set("price", request.object.get("price"))
        activeItem.set("tokenId", request.object.get("tokenId"))
        activeItem.set("seller", request.object.get("seller"))
        logger.info(
            `Adding Address: ${request.object.get("address")} TokenId: ${request.object.get(
                "tokenId"
            )}`
        )
        logger.info("Saving...")
        await activeItem.save()
    }
})

this is an afterSave hook, what doesn’t work in this function with Moralis v2?

You should use Parse.Cloud.afterSave instead of Moralis.Cloud.afterSave, and usually where is Moralis. you can use Parse., like Parse.Object instead of Moralis.Object

you can use console.log instead of logger.info

1 Like

Yes, as mentioned above I am already doing this in the parse-server-migration/src/cloud/main.ts

See the example below pasted from above:

Parse.Cloud.afterSave('ItemListed', async (request: any) => {
  // Every event gets triggered twice, once on unconfirmed, again on confirmed
  const confirmed = request.object.get('confirmed');
  const logger = Parse.Cloud.getLogger();
  logger.info('Looking for confirmed TX...');
  if (confirmed) {
    logger.info('Found item');
    const ActiveItem = Parse.Object.extend('ActiveItem');

    const query = new Parse.Query(ActiveItem);
    query.equalTo('nftAddress', request.object.get('nftAddress'));
    query.equalTo('tokenId', request.object.get('tokenId'));
    query.equalTo('marketplaceAddress', request.object.get('address'));
    query.equalTo('seller', request.object.get('seller'));
    const alreadyListedItem = await query.first();
    if (alreadyListedItem) {
      logger.info(`Deleting already listed ${request.object.get('objectId')}`);
      await alreadyListedItem.destroy();
      logger.info(
        `Deleted item with tokenId ${request.object.get('tokenId')} at address ${request.object.get(
          'address',
        )} since it's already been listed`,
      );
    }

    const activeItem = new ActiveItem();
    activeItem.set('marketplaceAddress', request.object.get('address'));
    activeItem.set('nftAddress', request.object.get('nftAddress'));
    activeItem.set('price', request.object.get('price'));
    activeItem.set('tokenId', request.object.get('tokenId'));
    activeItem.set('seller', request.object.get('seller'));
    logger.info(`Adding address: ${request.object.get('address')}. TokenId: ${request.object.get('tokenId')}`);
    logger.info('Saving');
    await activeItem.save();
  }
});

I am not able to follow what is the issue

I think we are not far from getting this sorted…
I’ll post if I find a solution.
A Youtube tutorial on cloud functions to create new tables on self hosted Moralis v2 server would be great.

1 Like

maybe this helps

1 Like

I guess what @Tristan3000 was trying to do is parsing each event in a separate document, I’m following the same tutorials like him (FreeCodeCamp Course), and the issue now is that Moralis stream, parse all the events in the same collection/document.

Here is how it parse all the event in NftMarketplaceEvent for now

I will try to create multiple streams programmatically one for each event, but is that the only way @cryptokid @alex?

1 Like

I think that you could try to change the code to make it save in different tables or you could do a pat processing after it was saved in the same table

Some food for thought that might help someone use the self-hosting to continue the FCC course

One thing about self-hosting it doesn’t work with devChain, so you will have to first deploy your contracts to a testnet, I used Goerli

After you have your Moralis self-hosting working using parse, you will now create a Moralis stream programmatically by customizing the addEvents.js as in the code below

PS: you will need to use ngrok for the webhook and initialize some variables in your environment file .env

const Moralis = require("moralis").default

const { EvmChain } = require("@moralisweb3/common-evm-utils")
require("dotenv").config()

const contractAddresses = require("./constants/networkMapping.json")
let chainId = process.env.CHAIN_ID || 5
const address = contractAddresses[chainId]["NftMarketplace"][0]
const itemListedAbi = [
    {
        anonymous: false,
        inputs: [
            {
                indexed: true,
                internalType: "address",
                name: "seller",
                type: "address",
            },
            {
                indexed: true,
                internalType: "address",
                name: "nftAddress",
                type: "address",
            },
            {
                indexed: true,
                internalType: "uint256",
                name: "tokenId",
                type: "uint256",
            },
            {
                indexed: false,
                internalType: "uint256",
                name: "price",
                type: "uint256",
            },
        ],
        name: "ItemListed",
        type: "event",
    },
] // valid abi of the event

const itemBoughtAbi = [
    {
        anonymous: false,
        inputs: [
            {
                indexed: true,
                internalType: "address",
                name: "buyer",
                type: "address",
            },
            {
                indexed: true,
                internalType: "address",
                name: "nftAddress",
                type: "address",
            },
            {
                indexed: true,
                internalType: "uint256",
                name: "tokenId",
                type: "uint256",
            },
            {
                indexed: false,
                internalType: "uint256",
                name: "price",
                type: "uint256",
            },
        ],
        name: "ItemBought",
        type: "event",
    },
]

const listingCancelledAbi = [
    {
        anonymous: false,
        inputs: [
            {
                indexed: true,
                internalType: "address",
                name: "nftAddress",
                type: "address",
            },
            {
                indexed: true,
                internalType: "uint256",
                name: "tokenId",
                type: "uint256",
            },
            {
                indexed: true,
                internalType: "address",
                name: "spender",
                type: "address",
            },
        ],
        name: "ListingCancelled",
        type: "event",
    },
]

const listingUpdatedAbi = [
    {
        anonymous: false,
        inputs: [
            {
                indexed: true,
                internalType: "address",
                name: "nftAddress",
                type: "address",
            },
            {
                indexed: true,
                internalType: "uint256",
                name: "tokenId",
                type: "uint256",
            },
            {
                indexed: true,
                internalType: "address",
                name: "spender",
                type: "address",
            },
            {
                indexed: false,
                internalType: "uint256",
                name: "oldPrice",
                type: "uint256",
            },
            {
                indexed: false,
                internalType: "uint256",
                name: "updatedPrice",
                type: "uint256",
            },
        ],
        name: "ListingUpdated",
        type: "event",
    },
]

async function main() {
    await Moralis.start({
        apiKey: process.env.NEXT_PUBLIC_API_KEY,
    })
    //ItemListed
    await startStream(
        "Monitor ItemListed",
        "ItemListed",
        "ItemListed(address,address,uint256,uint256)",
        itemListedAbi
    )

    //ItemBought
    await startStream(
        "Monitor ItemBought",
        "ItemBought",
        "ItemBought(address,address,uint256,uint256)",
        itemBoughtAbi
    )

    //ListingUpdated
    await startStream(
        "Monitor ListingUpdated",
        "ListingUpdated",
        "ListingUpdated(address,uint256,address,uint256,uint256)",
        listingUpdatedAbi
    )

    //ListingCancelled
    await startStream(
        "Monitor ListingCancelled",
        "ListingCancelled",
        "ListingCancelled(address,uint256,address)",
        listingCancelledAbi
    )
}

async function startStream(_desc, _tag, _topic, _abi) {
    const options = {
        chains: [EvmChain.GOERLI], // list of blockchains to monitor
        description: _desc, // your description
        tag: _tag, // give it a tag
        includeContractLogs: true,
        webhookUrl: process.env.NEXT_PUBLIC_WEB_HOOK_URL, // webhook url to receive events,
        abi: _abi,
        topic0: _topic,
        advancedOptions: [
            {
                topic0: _topic,
            },
        ],
    }

    const newStream = await Moralis.Streams.add(options)
    const { id } = newStream.toJSON() // { id: 'YOUR_STREAM_ID', ...newStream }

    // Now we attach bobs address to the stream

    await Moralis.Streams.addAddress({ address, id })
    console.log(`Stream created for ${_desc}`)
}

To test this you can run the mint-and-list.js (or simply execute the code that fire the event you are trying to capture) script we made on the NftMarketplace smartcontract and you should see the item listed event in your MongoDb as in the screenshot here

2 Likes

Hi, i can’t run moralis streams in self hosted server migration, can you teach me or give me code example , please!!