Skip to content

Commit

Permalink
fix(backfill): fix async handling in firestore to typesense backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
tharropoulos committed Sep 16, 2024
1 parent 4dfb0df commit d9c0a68
Showing 1 changed file with 44 additions and 48 deletions.
92 changes: 44 additions & 48 deletions functions/src/backfill.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,83 +10,79 @@ admin.initializeApp({

const validateBackfillRun = (snapshot) => {
if (![true, "true"].includes(snapshot.after.get("trigger"))) {
functions.logger.error(
"Skipping backfill. `trigger: true` key " +
`was not found in Firestore document ${config.typesenseBackfillTriggerDocumentInFirestore}.`);
functions.logger.error("Skipping backfill. `trigger: true` key " + `was not found in Firestore document ${config.typesenseBackfillTriggerDocumentInFirestore}.`);
return false;
}

// Check if there's a collection specific sync setup
const collectionsToSync = snapshot.after.get("firestore_collections");
if (Array.isArray(collectionsToSync) && !collectionsToSync.includes(config.firestoreCollectionPath)) {
functions.logger.error(
"Skipping backfill. The `firestore_collections` key in " +
`${config.typesenseBackfillTriggerDocumentInFirestore} did not contain collection ${config.firestoreCollectionPath}.`);
"Skipping backfill. The `firestore_collections` key in " + `${config.typesenseBackfillTriggerDocumentInFirestore} did not contain collection ${config.firestoreCollectionPath}.`,
);
return false;
}

return true;
};

module.exports = functions.firestore.document(config.typesenseBackfillTriggerDocumentInFirestore)
.onWrite(async (snapshot, context) => {
functions.logger.info("Backfilling " +
module.exports = functions.firestore.document(config.typesenseBackfillTriggerDocumentInFirestore).onWrite(async (snapshot, context) => {
functions.logger.info(
"Backfilling " +
`${config.firestoreCollectionFields.join(",")} fields in Firestore documents ` +
`from ${config.firestoreCollectionPath} ` +
`into Typesense Collection ${config.typesenseCollectionName} ` +
`on ${config.typesenseHosts.join(",")}`);
`on ${config.typesenseHosts.join(",")}`,
);

if (!validateBackfillRun(snapshot)) {
return false;
}
if (!validateBackfillRun(snapshot)) {
return false;
}

const typesense = createTypesenseClient();
const typesense = createTypesenseClient();

const querySnapshot = admin.firestore().collection(config.firestoreCollectionPath);
const querySnapshot = admin.firestore().collection(config.firestoreCollectionPath);

let lastDoc = null;
let lastDoc = null;

do {
const queryForThisBatch = lastDoc ? querySnapshot.startAfter(lastDoc) : querySnapshot;
const thisBatch = await queryForThisBatch.limit(config.typesenseBackfillBatchSize).get();
if (thisBatch.empty) {
break;
}
const currentDocumentsBatch = await Promise.all(thisBatch.docs.map(async (doc) => {
do {
const queryForThisBatch = lastDoc ? querySnapshot.startAfter(lastDoc) : querySnapshot;
const thisBatch = await queryForThisBatch.limit(config.typesenseBackfillBatchSize).get();
if (thisBatch.empty) {
break;
}
const currentDocumentsBatch = await Promise.all(
thisBatch.docs.map(async (doc) => {
return await utils.typesenseDocumentFromSnapshot(doc);
}));
}),
);

lastDoc = thisBatch.docs.at(-1) ?? null;
try {
await typesense
.collections(encodeURIComponent(config.typesenseCollectionName))
.documents()
.import(currentDocumentsBatch);
functions.logger.info(`Imported ${currentDocumentsBatch.length} documents into Typesense`);
} catch (error) {
functions.logger.error(`Import error in a batch of documents from ${currentDocumentsBatch[0].id} to ${lastDoc.id}`, error);
if ("importResults" in error) {
logImportErrors(error.importResults);
}
}
lastDoc = thisBatch.docs.at(-1) ?? null;
try {
await typesense.collections(encodeURIComponent(config.typesenseCollectionName)).documents().import(currentDocumentsBatch);
functions.logger.info(`Imported ${currentDocumentsBatch.length} documents into Typesense`);
} catch (error) {
functions.logger.error(`Import error in a batch of documents from ${currentDocumentsBatch[0].id} to ${lastDoc.id}`, error);
if ("importResults" in error) {
logImportErrors(error.importResults);
}
}

if (currentDocumentsBatch.length < config.typesenseBackfillBatchSize) {
break;
}
// Recurse on the next process tick, to avoid
// issues with the event loop on firebase functions related to resource release
await new Promise((resolve) => process.nextTick(resolve));
} while (lastDoc);
if (currentDocumentsBatch.length < config.typesenseBackfillBatchSize) {
break;
}
// Recurse on the next process tick, to avoid
// issues with the event loop on firebase functions related to resource release
await new Promise((resolve) => process.nextTick(resolve));
} while (lastDoc);

functions.logger.info("Done backfilling to Typesense from Firestore");
});
functions.logger.info("Done backfilling to Typesense from Firestore");
});

function logImportErrors(importResults) {
importResults.forEach((result) => {
if (result.success) return;

functions.logger.error(
`Error importing document with error: ${result.error}`,
result);
functions.logger.error(`Error importing document with error: ${result.error}`, result);
});
}

0 comments on commit d9c0a68

Please sign in to comment.