diff --git a/.env.example b/.env.example index 397a0336..743de715 100644 --- a/.env.example +++ b/.env.example @@ -42,7 +42,8 @@ DATABASE_URL=postgres://postgres:postgres@localhost:5432/grants_stack_indexer # METIS_ANDROMEDA_RPC_URL #COINGECKO_API_KEY= -#IPFS_GATEWAY= +#IPFS_GATEWAYs=[] +#WHITELISTED_ADDRESSES=["0x123..","0x456.."] # optional, enable the Postgraphile Pro plugin: https://www.npmjs.com/package/@graphile/pro #GRAPHILE_LICENSE diff --git a/docs/reindexing.md b/docs/reindexing.md index 00f6fc67..8ea7689e 100644 --- a/docs/reindexing.md +++ b/docs/reindexing.md @@ -12,9 +12,15 @@ When deploying changes to the indexer, it's important to clarify the results you - The indexer will create a new schema in Postgres named `chain_data_${version}`. If this schema does not exist, it will be created, all necessary tables will be set up, and indexing will start from scratch. - If the schema already exists, the indexer will resume indexing from the last indexed block unless the `--drop-db` flag is specified via the CLI. This will drop the existing database and start fresh. -### Using `--drop-db` in Development +### Dropping Schemas in Development -- During development, you can use the `--drop-db` flag to ensure the indexer always deletes the existing schema and migrates from scratch. This can be useful for testing schema changes and event handler modifications without retaining old data. +- During development, you can use the `--drop-db` flag to ensure the indexer always deletes all existing schema and migrates from scratch. This can be useful for testing schema changes and event handler modifications without retaining old data. + +- During development, you can use the `--drop-chain-db` flag to ensure the indexer always deletes chain schema and migrates from scratch. + +- During development, you can use the `--drop-ipfs-db` flag to ensure the indexer always deletes ipfs schema and migrates from scratch. + +- During development, you can use the `--drop-price-db` flag to ensure the indexer always deletes price schema and migrates from scratch. ### Important Notes diff --git a/indexer-compose.yml b/indexer-compose.yml index d2cf9758..a2a33f3b 100644 --- a/indexer-compose.yml +++ b/indexer-compose.yml @@ -20,7 +20,7 @@ services: ENABLE_RESOURCE_MONITOR: ${ENABLE_RESOURCE_MONITOR} ESTIMATES_LINEARQF_WORKER_POOL_SIZE: ${ESTIMATES_LINEARQF_WORKER_POOL_SIZE} PINO_PRETTY: ${PINO_PRETTY} - IPFS_GATEWAY: ${IPFS_GATEWAY} + IPFS_GATEWAYS: ${IPFS_GATEWAYS} COINGECKO_API_KEY: ${COINGECKO_API_KEY} GRAPHILE_LICENSE: ${GRAPHILE_LICENSE} SEPOLIA_RPC_URL: ${SEPOLIA_RPC_URL} @@ -62,7 +62,7 @@ services: ENABLE_RESOURCE_MONITOR: ${ENABLE_RESOURCE_MONITOR} ESTIMATES_LINEARQF_WORKER_POOL_SIZE: ${ESTIMATES_LINEARQF_WORKER_POOL_SIZE} PINO_PRETTY: ${PINO_PRETTY} - IPFS_GATEWAY: ${IPFS_GATEWAY} + IPFS_GATEWAYS: ${IPFS_GATEWAYS} COINGECKO_API_KEY: ${COINGECKO_API_KEY} GRAPHILE_LICENSE: ${GRAPHILE_LICENSE} SEPOLIA_RPC_URL: ${SEPOLIA_RPC_URL} diff --git a/src/config.ts b/src/config.ts index 2fae6422..8a114905 100644 --- a/src/config.ts +++ b/src/config.ts @@ -20,7 +20,9 @@ type CoingeckoSupportedChainId = | 42220 | 1088; -const CHAIN_DATA_VERSION = "81"; +const CHAIN_DATA_VERSION = "83"; +const IPFS_DATA_VERSION = "1"; +const PRICE_DATA_VERSION = "1"; export type Token = { code: string; @@ -1818,7 +1820,7 @@ export type Config = { httpServerWaitForSync: boolean; httpServerEnabled: boolean; indexerEnabled: boolean; - ipfsGateway: string; + ipfsGateways: string[]; coingeckoApiKey: string | null; coingeckoApiUrl: string; chains: Chain[]; @@ -1829,11 +1831,18 @@ export type Config = { readOnlyDatabaseUrl: string; dataVersion: string; databaseSchemaName: string; + ipfsDataVersion: string; + ipfsDatabaseSchemaName: string; + priceDataVersion: string; + priceDatabaseSchemaName: string; hostname: string; pinoPretty: boolean; deploymentEnvironment: "local" | "development" | "staging" | "production"; enableResourceMonitor: boolean; dropDb: boolean; + dropChainDb: boolean; + dropIpfsDb: boolean; + dropPriceDb: boolean; removeCache: boolean; estimatesLinearQfWorkerPoolSize: number | null; }; @@ -1847,9 +1856,18 @@ export function getConfig(): Config { "from-block": { type: "string", }, + "drop-chain-db": { + type: "boolean", + }, + "drop-ipfs-db": { + type: "boolean", + }, "drop-db": { type: "boolean", }, + "drop-price-db": { + type: "boolean", + }, "rm-cache": { type: "boolean", }, @@ -1981,10 +1999,11 @@ export function getConfig(): Config { const runOnce = z.boolean().default(false).parse(args["run-once"]); - const ipfsGateway = z + const ipfsGateways = z .string() - .default("https://ipfs.io") - .parse(process.env.IPFS_GATEWAY); + .array() + .default(["https://ipfs.io"]) + .parse(JSON.parse(process.env.IPFS_GATEWAYS!)); const sentryDsn = z .union([z.string(), z.null()]) @@ -2001,7 +2020,16 @@ export function getConfig(): Config { const dataVersion = CHAIN_DATA_VERSION; const databaseSchemaName = `chain_data_${dataVersion}`; + const ipfsDataVersion = IPFS_DATA_VERSION; + const ipfsDatabaseSchemaName = `ipfs_data_${ipfsDataVersion}`; + + const priceDataVersion = PRICE_DATA_VERSION; + const priceDatabaseSchemaName = `price_data_${priceDataVersion}`; + const dropDb = z.boolean().default(false).parse(args["drop-db"]); + const dropChainDb = z.boolean().default(false).parse(args["drop-chain-db"]); + const dropIpfsDb = z.boolean().default(false).parse(args["drop-ipfs-db"]); + const dropPriceDb = z.boolean().default(false).parse(args["drop-price-db"]); const removeCache = z.boolean().default(false).parse(args["rm-cache"]); @@ -2041,7 +2069,7 @@ export function getConfig(): Config { cacheDir, logLevel, runOnce, - ipfsGateway, + ipfsGateways, passportScorerId, apiHttpPort, pinoPretty, @@ -2050,9 +2078,16 @@ export function getConfig(): Config { databaseUrl, readOnlyDatabaseUrl, dropDb, + dropChainDb, + dropIpfsDb, + dropPriceDb, removeCache, dataVersion, databaseSchemaName, + ipfsDataVersion, + ipfsDatabaseSchemaName, + priceDataVersion, + priceDatabaseSchemaName, httpServerWaitForSync, httpServerEnabled, indexerEnabled, diff --git a/src/database/changeset.ts b/src/database/changeset.ts index 5eb7f12e..c53d2a14 100644 --- a/src/database/changeset.ts +++ b/src/database/changeset.ts @@ -16,6 +16,7 @@ import { NewPrice, NewLegacyProject, NewApplicationPayout, + NewIpfsData, } from "./schema.js"; export type DataChange = @@ -140,4 +141,8 @@ export type DataChange = | { type: "InsertApplicationPayout"; payout: NewApplicationPayout; + } + | { + type: "InsertIpfsData"; + ipfs: NewIpfsData; }; diff --git a/src/database/index.ts b/src/database/index.ts index 2b749ff2..e767b354 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -14,8 +14,9 @@ import { NewDonation, LegacyProjectTable, ApplicationPayout, + IpfsDataTable, } from "./schema.js"; -import { migrate } from "./migrate.js"; +import { migrate, migrateDataFetcher, migratePriceFetcher } from "./migrate.js"; import { encodeJsonWithBigInts } from "../utils/index.js"; import type { DataChange } from "./changeset.js"; import { Logger } from "pino"; @@ -37,6 +38,7 @@ interface Tables { prices: PriceTable; legacyProjects: LegacyProjectTable; applicationsPayouts: ApplicationPayout; + ipfsData: IpfsDataTable; } type KyselyDb = Kysely; @@ -53,13 +55,17 @@ export class Database { #statsTimeout: ReturnType | null = null; #logger: Logger; - readonly databaseSchemaName: string; + readonly chainDataSchemaName: string; + readonly ipfsDataSchemaName: string; + readonly priceDataSchemaName: string; constructor(options: { statsUpdaterEnabled: boolean; logger: Logger; connectionPool: Pool; - schemaName: string; + chainDataSchemaName: string; + ipfsDataSchemaName: string; + priceDataSchemaName: string; }) { const dialect = new PostgresDialect({ pool: options.connectionPool, @@ -72,10 +78,12 @@ export class Database { plugins: [new CamelCasePlugin()], }); - this.#db = this.#db.withSchema(options.schemaName); + // Initialize schema names + this.chainDataSchemaName = options.chainDataSchemaName; + this.ipfsDataSchemaName = options.ipfsDataSchemaName; + this.priceDataSchemaName = options.priceDataSchemaName; this.#logger = options.logger; - this.databaseSchemaName = options.schemaName; this.scheduleDonationQueueFlush(); @@ -88,7 +96,7 @@ export class Database { const client = await this.#connectionPool.connect(); // generate lock id based on schema - const lockId = this.databaseSchemaName.split("").reduce((acc, char) => { + const lockId = this.chainDataSchemaName.split("").reduce((acc, char) => { return acc + char.charCodeAt(0); }, 0); @@ -132,12 +140,12 @@ export class Database { } private async updateStats() { - const donationsTableRef = `"${this.databaseSchemaName}"."donations"`; + const donationsTableRef = `"${this.chainDataSchemaName}"."donations"`; await sql .raw( ` - UPDATE "${this.databaseSchemaName}"."rounds" AS r + UPDATE "${this.chainDataSchemaName}"."rounds" AS r SET total_amount_donated_in_usd = d.total_amount, total_donations_count = d.donation_count, @@ -160,7 +168,7 @@ export class Database { await sql .raw( ` - UPDATE "${this.databaseSchemaName}"."applications" AS a + UPDATE "${this.chainDataSchemaName}"."applications" AS a SET total_amount_donated_in_usd = d.total_amount, total_donations_count = d.donation_count, @@ -223,38 +231,86 @@ export class Database { } } - async dropSchemaIfExists() { + async dropChainDataSchemaIfExists() { await this.#db.schema - .dropSchema(this.databaseSchemaName) + .withSchema(this.chainDataSchemaName) + .dropSchema(this.chainDataSchemaName) .ifExists() .cascade() .execute(); } - async createSchemaIfNotExists(logger: Logger) { + async dropIpfsDataSchemaIfExists() { + await this.#db.schema + .withSchema(this.ipfsDataSchemaName) + .dropSchema(this.ipfsDataSchemaName) + .ifExists() + .cascade() + .execute(); + } + + async dropPriceDataSchemaIfExists() { + await this.#db.schema + .withSchema(this.priceDataSchemaName) + .dropSchema(this.priceDataSchemaName) + .ifExists() + .cascade() + .execute(); + } + + async dropAllSchemaIfExists() { + await this.dropChainDataSchemaIfExists(); + await this.dropIpfsDataSchemaIfExists(); + await this.dropPriceDataSchemaIfExists(); + } + + async createSchemaIfNotExists( + schemaName: string, + migrateFn: (tx: any, schemaName: string) => Promise, + logger: Logger + ) { const exists = await sql<{ exists: boolean }>` - SELECT EXISTS ( - SELECT 1 FROM information_schema.schemata - WHERE schema_name = ${this.databaseSchemaName} - )`.execute(this.#db); + SELECT EXISTS ( + SELECT 1 FROM information_schema.schemata + WHERE schema_name = ${schemaName} + )`.execute(this.#db.withSchema(schemaName)); if (exists.rows.length > 0 && exists.rows[0].exists) { logger.info({ - msg: `schema "${this.databaseSchemaName}" exists, skipping creation`, + msg: `schema "${schemaName}" exists, skipping creation`, }); - return; } logger.info({ - msg: `schema "${this.databaseSchemaName}" does not exist, creating schema`, + msg: `schema "${schemaName}" does not exist, creating schema`, }); - await this.#db.transaction().execute(async (tx) => { - await tx.schema.createSchema(this.databaseSchemaName).execute(); + await this.#db + .withSchema(schemaName) + .transaction() + .execute(async (tx) => { + await tx.schema.createSchema(schemaName).execute(); + await migrateFn(tx, schemaName); + }); + } - await migrate(tx, this.databaseSchemaName); - }); + async createAllSchemas(logger: Logger) { + await this.createSchemaIfNotExists( + this.chainDataSchemaName, + migrate, + logger + ); + await this.createSchemaIfNotExists( + this.ipfsDataSchemaName, + migrateDataFetcher, + logger + ); + await this.createSchemaIfNotExists( + this.priceDataSchemaName, + migratePriceFetcher, + logger + ); } async applyChanges(changes: DataChange[]): Promise { @@ -267,6 +323,7 @@ export class Database { switch (change.type) { case "InsertPendingProjectRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("pendingProjectRoles") .values(change.pendingProjectRole) .execute(); @@ -275,6 +332,7 @@ export class Database { case "DeletePendingProjectRoles": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("pendingProjectRoles") .where("id", "in", change.ids) .execute(); @@ -283,6 +341,7 @@ export class Database { case "InsertPendingRoundRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("pendingRoundRoles") .values(change.pendingRoundRole) .execute(); @@ -291,6 +350,7 @@ export class Database { case "DeletePendingRoundRoles": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("pendingRoundRoles") .where("id", "in", change.ids) .execute(); @@ -298,12 +358,17 @@ export class Database { } case "InsertProject": { - await this.#db.insertInto("projects").values(change.project).execute(); + await this.#db + .withSchema(this.chainDataSchemaName) + .insertInto("projects") + .values(change.project) + .execute(); break; } case "UpdateProject": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("projects") .set(change.project) .where("id", "=", change.projectId) @@ -314,6 +379,7 @@ export class Database { case "InsertProjectRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("projectRoles") .values(change.projectRole) .execute(); @@ -322,6 +388,7 @@ export class Database { case "DeleteAllProjectRolesByRole": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("projectRoles") .where("chainId", "=", change.projectRole.chainId) .where("projectId", "=", change.projectRole.projectId) @@ -332,6 +399,7 @@ export class Database { case "DeleteAllProjectRolesByRoleAndAddress": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("projectRoles") .where("chainId", "=", change.projectRole.chainId) .where("projectId", "=", change.projectRole.projectId) @@ -342,12 +410,17 @@ export class Database { } case "InsertRound": { - await this.#db.insertInto("rounds").values(change.round).execute(); + await this.#db + .withSchema(this.chainDataSchemaName) + .insertInto("rounds") + .values(change.round) + .execute(); break; } case "UpdateRound": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set(change.round) .where("chainId", "=", change.chainId) @@ -358,6 +431,7 @@ export class Database { case "IncrementRoundFundedAmount": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set((eb) => ({ fundedAmount: eb("fundedAmount", "+", change.fundedAmount), @@ -375,6 +449,7 @@ export class Database { case "UpdateRoundByStrategyAddress": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set(change.round) .where("chainId", "=", change.chainId) @@ -385,6 +460,7 @@ export class Database { case "InsertRoundRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("roundRoles") .values(change.roundRole) .execute(); @@ -393,6 +469,7 @@ export class Database { case "DeleteAllRoundRolesByRoleAndAddress": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("roundRoles") .where("chainId", "=", change.roundRole.chainId) .where("roundId", "=", change.roundRole.roundId) @@ -411,7 +488,11 @@ export class Database { }; } - await this.#db.insertInto("applications").values(application).execute(); + await this.#db + .withSchema(this.chainDataSchemaName) + .insertInto("applications") + .values(application) + .execute(); break; } @@ -425,6 +506,7 @@ export class Database { } await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("applications") .set(application) .where("chainId", "=", change.chainId) @@ -441,6 +523,7 @@ export class Database { case "InsertManyDonations": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("donations") .values(change.donations) .onConflict((c) => c.column("id").doNothing()) @@ -449,12 +532,17 @@ export class Database { } case "InsertManyPrices": { - await this.#db.insertInto("prices").values(change.prices).execute(); + await this.#db + .withSchema(this.priceDataSchemaName) + .insertInto("prices") + .values(change.prices) + .execute(); break; } case "IncrementRoundDonationStats": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set((eb) => ({ totalAmountDonatedInUsd: eb( @@ -472,6 +560,7 @@ export class Database { case "IncrementRoundTotalDistributed": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set((eb) => ({ totalDistributed: eb("totalDistributed", "+", change.amount), @@ -484,6 +573,7 @@ export class Database { case "IncrementApplicationDonationStats": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("applications") .set((eb) => ({ totalAmountDonatedInUsd: eb( @@ -502,6 +592,7 @@ export class Database { case "NewLegacyProject": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("legacyProjects") .values(change.legacyProject) .execute(); @@ -510,12 +601,22 @@ export class Database { case "InsertApplicationPayout": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("applicationsPayouts") .values(change.payout) .execute(); break; } + case "InsertIpfsData": { + await this.#db + .withSchema(this.ipfsDataSchemaName) + .insertInto("ipfsData") + .values(change.ipfs) + .execute(); + break; + } + default: throw new Error(`Unknown changeset type`); } @@ -523,6 +624,7 @@ export class Database { async getPendingProjectRolesByRole(chainId: ChainId, role: string) { const pendingProjectRole = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("pendingProjectRoles") .where("chainId", "=", chainId) .where("role", "=", role) @@ -534,6 +636,7 @@ export class Database { async getPendingRoundRolesByRole(chainId: ChainId, role: string) { const pendingRoundRole = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("pendingRoundRoles") .where("chainId", "=", chainId) .where("role", "=", role) @@ -545,6 +648,7 @@ export class Database { async getProjectById(chainId: ChainId, projectId: string) { const project = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("projects") .where("chainId", "=", chainId) .where("id", "=", projectId) @@ -556,6 +660,7 @@ export class Database { async getProjectByAnchor(chainId: ChainId, anchorAddress: Address) { const project = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("projects") .where("chainId", "=", chainId) .where("anchorAddress", "=", anchorAddress) @@ -567,6 +672,7 @@ export class Database { async getRoundById(chainId: ChainId, roundId: string) { const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where("id", "=", roundId) @@ -578,6 +684,7 @@ export class Database { async getRoundByStrategyAddress(chainId: ChainId, strategyAddress: Address) { const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where("strategyAddress", "=", strategyAddress) @@ -593,6 +700,7 @@ export class Database { roleValue: string ) { const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where(`${roleName}Role`, "=", roleValue) @@ -615,6 +723,7 @@ export class Database { } const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where("id", "=", roundId) @@ -631,6 +740,7 @@ export class Database { async getAllChainRounds(chainId: ChainId) { const rounds = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .selectAll() @@ -641,6 +751,7 @@ export class Database { async getAllRoundApplications(chainId: ChainId, roundId: string) { return await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -650,6 +761,7 @@ export class Database { async getAllRoundDonations(chainId: ChainId, roundId: string) { return await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("donations") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -663,6 +775,7 @@ export class Database { applicationId: string ) { const application = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -679,6 +792,7 @@ export class Database { projectId: string ) { const application = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -695,6 +809,7 @@ export class Database { anchorAddress: Address ) { const application = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -707,6 +822,7 @@ export class Database { async getLatestPriceTimestampForChain(chainId: ChainId) { const latestPriceTimestamp = await this.#db + .withSchema(this.priceDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .orderBy("timestamp", "desc") @@ -723,6 +839,7 @@ export class Database { blockNumber: bigint | "latest" ) { let priceQuery = this.#db + .withSchema(this.priceDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .where("tokenAddress", "=", tokenAddress) @@ -741,6 +858,7 @@ export class Database { async getAllChainPrices(chainId: ChainId) { return await this.#db + .withSchema(this.priceDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .orderBy("blockNumber", "asc") @@ -750,6 +868,7 @@ export class Database { async getAllChainProjects(chainId: ChainId) { return await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("projects") .where("chainId", "=", chainId) .selectAll() @@ -761,6 +880,7 @@ export class Database { donorAddress: Address ) { const donations = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("donations") .where("donations.donorAddress", "=", donorAddress) .where("donations.chainId", "=", chainId) @@ -784,6 +904,7 @@ export class Database { async getV2ProjectIdByV1ProjectId(v1ProjectId: string) { const result = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("legacyProjects") .where("v1ProjectId", "=", v1ProjectId) .select("v2ProjectId") @@ -791,4 +912,102 @@ export class Database { return result ?? null; } + + async deleteChainData(chainId: ChainId) { + this.#logger.info("Deleting chain data for chainId:", chainId); + + await this.#db.transaction().execute(async (trx) => { + this.#logger.info("Deleting pending round roles"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("pendingRoundRoles") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting round roles"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("roundRoles") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting pending project roles"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("pendingProjectRoles") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting project roles"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("projectRoles") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting applications"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("applications") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting applications donations"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("donations") + .where("chainId", "=", chainId) + .execute(); + + // this.#logger.info("Deleting donation prices"); + // await trx + // .withSchema(this.priceDataSchemaName) + // .deleteFrom("prices") + // .where("chainId", "=", chainId) + // .execute(); + + this.#logger.info("Deleting applications"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("applications") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting rounds"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("rounds") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting projects"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("projects") + .where("chainId", "=", chainId) + .execute(); + }); + + this.#logger.info("Updating subscriptions indexed_to_block"); + const sqlQuery = ` + UPDATE ${this.chainDataSchemaName}.subscriptions + SET indexed_to_block = 0::bigint + WHERE chain_id = ${chainId} + `; + + await sql.raw(sqlQuery).execute(this.#db); + + this.#logger.info("Deleted chain data for chainId:", chainId); + } + + async getDataByCid(cId: string) { + const metadata = await this.#db + .withSchema(this.ipfsDataSchemaName) + .selectFrom("ipfsData") + .where("cid", "=", cId) + .selectAll() + .executeTakeFirst(); + + return metadata ?? null; + } } diff --git a/src/database/migrate.ts b/src/database/migrate.ts index c473d710..db6e2b88 100644 --- a/src/database/migrate.ts +++ b/src/database/migrate.ts @@ -297,22 +297,6 @@ export async function migrate(db: Kysely, schemaName: string) { .columns(["chainId", "roundId", "applicationId"]) .execute(); - await schema - .createTable("prices") - .addColumn("id", "serial", (cb) => cb.primaryKey()) - .addColumn("chainId", CHAIN_ID_TYPE) - .addColumn("tokenAddress", ADDRESS_TYPE) - .addColumn("priceInUSD", "real") - .addColumn("timestamp", "timestamptz") - .addColumn("blockNumber", BIGINT_TYPE) - .execute(); - - await db.schema - .createIndex("idx_prices_chain_token_block") - .on("prices") - .expression(sql`chain_id, token_address, block_number DESC`) - .execute(); - await schema .createTable("legacy_projects") .addColumn("id", "serial", (col) => col.primaryKey()) @@ -392,3 +376,42 @@ export async function migrate(db: Kysely, schemaName: string) { $$ language sql stable; `.execute(db); } + +export async function migrateDataFetcher(db: Kysely, schemaName: string) { + const schema = db.withSchema(schemaName).schema; + + await schema + .createTable("ipfs_data") + .addColumn("cid", "text") + .addColumn("data", "jsonb") + .addUniqueConstraint("unique_cid", ["cid"]) + .execute(); +} + +export async function migratePriceFetcher( + db: Kysely, + schemaName: string +) { + const schema = db.withSchema(schemaName).schema; + + await schema + .createTable("prices") + .addColumn("id", "serial", (cb) => cb.primaryKey()) + .addColumn("chainId", CHAIN_ID_TYPE) + .addColumn("tokenAddress", ADDRESS_TYPE) + .addColumn("priceInUSD", "real") + .addColumn("timestamp", "timestamptz") + .addColumn("blockNumber", BIGINT_TYPE) + .addUniqueConstraint("unique_chainId_tokenAddress_blockNumber", [ + "chainId", + "tokenAddress", + "blockNumber", + ]) + .execute(); + + await db.schema + .createIndex("idx_prices_chain_token_block") + .on("prices") + .expression(sql`chain_id, token_address, block_number DESC`) + .execute(); +} diff --git a/src/database/schema.ts b/src/database/schema.ts index 06b9a906..8e50c133 100644 --- a/src/database/schema.ts +++ b/src/database/schema.ts @@ -125,6 +125,11 @@ export type ProjectTable = { projectType: ProjectType; }; +export type IpfsDataTable = { + cid: string; + data: unknown; +}; + export type Project = Selectable; export type NewProject = Insertable; export type PartialProject = Updateable; @@ -253,3 +258,8 @@ export type ApplicationPayout = { }; export type NewApplicationPayout = Insertable; + +export type NewIpfsData = { + cid: string; + data: unknown; +}; diff --git a/src/http/api/v1/status.ts b/src/http/api/v1/status.ts index e4ad3c08..f29c33a9 100644 --- a/src/http/api/v1/status.ts +++ b/src/http/api/v1/status.ts @@ -9,7 +9,9 @@ export const createHandler = (config: HttpApiConfig): express.Router => { res.json({ hostname: config.hostname, buildTag: config.buildTag, - databaseSchema: config.db.databaseSchemaName, + chainDataSchemaName: config.db.chainDataSchemaName, + ipfsDataSchema: config.db.ipfsDataSchemaName, + priceDataSchema: config.db.priceDataSchemaName, }); }); diff --git a/src/http/app.ts b/src/http/app.ts index d5ef33ea..22a36bd1 100644 --- a/src/http/app.ts +++ b/src/http/app.ts @@ -13,6 +13,8 @@ import { PassportProvider } from "../passport/index.js"; import { DataProvider } from "../calculator/dataProvider/index.js"; import { Chain } from "../config.js"; import { Database } from "../database/index.js"; +import { Indexer } from "chainsauce"; +import { recoverMessageAddress } from "viem"; type AsyncRequestHandler = ( req: express.Request, @@ -38,6 +40,7 @@ export interface HttpApiConfig { | { type: "in-thread" } | { type: "worker-pool"; workerPoolSize: number }; }; + indexedChains?: Indexer[] | null; } interface HttpApi { @@ -100,6 +103,109 @@ export const createHttpApi = (config: HttpApiConfig): HttpApi => { res.send(config.dataVersion); }); + app.get("/config", (_req, res) => { + res.send(config); + }); + + app.post("/index", (req, res) => { + try { + const { chainId, address, timestamp, signature } = req.body as { + chainId: string; + address: string; + timestamp: number; + signature: `0x${string}`; + }; + + const reindex = async () => { + if (!chainId || !config.indexedChains) { + return res.status(400).send("chainId is required"); + } + + try { + const isAuthenticated = await recoverEthereumAddress({ + address, + timestamp, + signature, + }); + + config.logger.info( + `Reindexing chain ${chainId} requested by ${address} at ${timestamp}` + ); + + if (isAuthenticated) { + await config.db.deleteChainData(Number(chainId)); + + const filteredIndexedChains = config.indexedChains.filter( + (chain) => + (chain as { context: { chainId: number } }).context.chainId === + Number(chainId) + ); + + if (filteredIndexedChains.length === 0) { + config.logger.error(`Chain ${chainId} not found`); + return res.status(400).send("chain not found"); + } + + const filteredChains = config.chains.filter( + (chain) => chain.id === Number(chainId) + ); + + if (filteredChains.length === 0) { + config.logger.error(`Chain ${chainId} not found`); + return res.status(400).send("chain not found"); + } + + const chain = filteredChains[0]; + const indexedChain = filteredIndexedChains[0]; + + chain.subscriptions.forEach((subscription) => { + indexedChain.unsubscribeFromContract({ + address: subscription.address, + }); + + const contractName = subscription.contractName; + const subscriptionFromBlock = + subscription.fromBlock === undefined + ? undefined + : BigInt(subscription.fromBlock); + + indexedChain.subscribeToContract({ + contract: contractName, + address: subscription.address, + fromBlock: subscriptionFromBlock || BigInt(0), + }); + }); + } else { + config.logger.error( + `Reindexing chain ${chainId} requested by ${address} at ${timestamp} failed authentication` + ); + return res.status(401).send("Authentication failed"); + } + } catch { + config.logger.error( + `Reindexing chain ${chainId} requested by ${address} at ${timestamp} failed with error` + ); + return res.status(500).send("An error occurred"); + } + }; + + reindex() + .then(() => { + config.logger.info(`Reindexing of chain ${chainId} finished`); + res.send("Reindexing finished"); + }) + .catch(() => { + config.logger.error( + `Reindexing of chain ${chainId} failed with error` + ); + res.status(500).send("An error occurred"); + }); + } catch { + config.logger.error(`Reindexing failed with error`); + res.status(500).send("An error occurred"); + } + }); + app.use("/api/v1", api); if (config.graphqlHandler !== undefined) { @@ -149,3 +255,58 @@ function staticJsonDataHandler(dataProvider: DataProvider) { } }; } + +const VALIDITY_PERIOD = 1 * 60 * 1000; // 1 minute validity period + +const recoverEthereumAddress = async ({ + address, + timestamp, + signature, +}: { + address: string; + timestamp: number; + signature: `0x${string}`; +}) => { + if (!address || !timestamp || !signature) { + return false; + } + const whitelistedAddresses: string[] = process.env.WHITELISTED_ADDRESSES + ? (JSON.parse(process.env.WHITELISTED_ADDRESSES) as string[]) + : []; + + if (!whitelistedAddresses) { + return false; + } + + // Check timestamp validity + const currentTime = Date.now(); + if (currentTime - timestamp > VALIDITY_PERIOD) { + return false; + } + + // Construct the expected message to be signed + const expectedMessage = `Authenticate with timestamp: ${timestamp}`; + try { + // Recover address from signature and expected message + const recoveredAddress = await recoverMessageAddress({ + message: expectedMessage, + signature, + }); + + const whitelistedAddressesLowercase = whitelistedAddresses.map((addr) => + addr.toLowerCase() + ); + + if ( + recoveredAddress.toLowerCase() === address.toLowerCase() && + whitelistedAddressesLowercase.includes(address.toLowerCase()) + ) { + return true; + } else { + return false; + } + } catch (error) { + console.error("Error verifying signature:", error); + return false; + } +}; diff --git a/src/index.ts b/src/index.ts index 5062a1aa..23136f74 100644 --- a/src/index.ts +++ b/src/index.ts @@ -153,7 +153,9 @@ async function main(): Promise { logger: baseLogger.child({ subsystem: "Database" }), statsUpdaterEnabled: config.indexerEnabled, connectionPool: databaseConnectionPool, - schemaName: config.databaseSchemaName, + chainDataSchemaName: config.databaseSchemaName, + ipfsDataSchemaName: config.ipfsDatabaseSchemaName, + priceDataSchemaName: config.priceDatabaseSchemaName, }); baseLogger.info({ @@ -242,15 +244,22 @@ async function main(): Promise { const lock = await db.acquireWriteLock(); if (lock !== null) { - baseLogger.info("acquired write lock"); - if (isFirstRun) { if (config.dropDb) { - baseLogger.info("dropping schema"); - await db.dropSchemaIfExists(); + baseLogger.info("dropping all schemas"); + await db.dropAllSchemaIfExists(); + } else if (config.dropChainDb) { + baseLogger.info("resetting chain data schema"); + await db.dropChainDataSchemaIfExists(); + } else if (config.dropIpfsDb) { + baseLogger.info("resetting ipfs data schema"); + await db.dropIpfsDataSchemaIfExists(); + } else if (config.dropPriceDb) { + baseLogger.info("resetting price data schema"); + await db.dropPriceDataSchemaIfExists(); } - await db.createSchemaIfNotExists(baseLogger); + await db.createAllSchemas(baseLogger); await subscriptionStore.init(); } @@ -326,7 +335,11 @@ async function main(): Promise { const graphqlHandler = postgraphile( readOnlyDatabaseConnectionPool, - config.databaseSchemaName, + [ + config.databaseSchemaName, + config.ipfsDatabaseSchemaName, + config.priceDatabaseSchemaName, + ], { watchPg: false, graphqlRoute: "/graphql", @@ -403,6 +416,7 @@ async function main(): Promise { workerPoolSize: config.estimatesLinearQfWorkerPoolSize, }, }, + indexedChains: await indexChainsPromise, }); await httpApi.start(); @@ -467,29 +481,86 @@ async function catchupAndWatchChain( return undefined; } - const url = `${config.ipfsGateway}/ipfs/${cid}`; - - // chainLogger.trace(`Fetching ${url}`); + // Check if data is already in the IPFS database + const ipfsData = await db.getDataByCid(cid); + if (ipfsData) { + // chainLogger.info(`Found IPFS data in database for CID: ${cid}`); + return Promise.resolve(ipfsData.data as string as T); + } - const res = await fetch(url, { - timeout: 2000, - onRetry(cause) { - chainLogger.debug({ - msg: "Retrying IPFS request", - url: url, - err: cause, + // Fetch from a single IPFS gateway + const fetchFromGateway = async (url: string): Promise => { + try { + const res = await fetch(url, { + timeout: 2000, + onRetry(cause) { + chainLogger.debug({ + msg: "Retrying IPFS request", + url: url, + err: cause, + }); + }, + retry: { retries: 3, minTimeout: 2000, maxTimeout: 60 * 10000 }, + // IPFS data is immutable, we can rely entirely on the cache when present + cache: "force-cache", + cachePath: + config.cacheDir !== null + ? path.join(config.cacheDir, "ipfs") + : undefined, }); - }, - retry: { retries: 3, minTimeout: 2000, maxTimeout: 60 * 10000 }, - // IPFS data is immutable, we can rely entirely on the cache when present - cache: "force-cache", - cachePath: - config.cacheDir !== null - ? path.join(config.cacheDir, "ipfs") - : undefined, - }); - return (await res.json()) as T; + if (res.ok) { + return (await res.json()) as T; // Return the fetched data + } else { + chainLogger.warn( + `Failed to fetch from ${url}, status: ${res.status} ${res.statusText}` + ); + } + } catch (err) { + chainLogger.error( + `Error fetching from gateway ${url}: ${String(err)}` + ); + } + }; + + // Iterate through each gateway and attempt to fetch data + for (const gateway of config.ipfsGateways) { + const url = `${gateway}/ipfs/${cid}`; + // chainLogger.info(`Trying IPFS gateway: ${gateway} for CID: ${cid}`); + + const result = await fetchFromGateway(url); + if (result !== undefined) { + // chainLogger.info( + // `Fetch successful from gateway: ${gateway} for CID: ${cid}` + // ); + + // Save to IpfsData table + try { + await db.applyChange({ + type: "InsertIpfsData", + ipfs: { + cid, + data: result, // TODO: check is JSON.parse is needed + }, + }); + } catch (err) { + chainLogger.error( + `Error saving IPFS data to database: ${String(err)}` + ); + } + + return result; // Return the result if fetched successfully + } else { + chainLogger.warn( + `IPFS fetch failed for gateway ${gateway} for CID ${cid}` + ); + } + } + + chainLogger.error( + `Failed to fetch IPFS data for CID ${cid} from all gateways.` + ); + return undefined; // Return undefined if all gateways fail }; chainLogger.info("DEBUG: catching up with blockchain events"); diff --git a/src/prices/provider.ts b/src/prices/provider.ts index 01f75656..8279432f 100644 --- a/src/prices/provider.ts +++ b/src/prices/provider.ts @@ -211,10 +211,27 @@ export function createPriceProvider( }); } - await db.applyChange({ - type: "InsertManyPrices", - prices: [newPrice], - }); + // Check if the price is already in the database + const existingPrice = await db.getTokenPriceByBlockNumber( + chainId, + newPrice.tokenAddress, + blockNumber + ); + + if (!existingPrice) { + try { + await db.applyChange({ + type: "InsertManyPrices", + prices: [newPrice], + }); + } catch (e) { + logger.error({ + msg: "Failed to insert price", + error: e, + price: newPrice, + }); + } + } return { ...newPrice, tokenDecimals: token.decimals }; }