Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with ack messages #2008

Open
prgwar opened this issue Jan 22, 2025 · 5 comments
Open

Issue with ack messages #2008

prgwar opened this issue Jan 22, 2025 · 5 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/nodejs-pubsub API.

Comments

@prgwar
Copy link

prgwar commented Jan 22, 2025

I am using pub/sub for a very long time. i am using node js client to mange the pub sub.

today out of blue moon, messages started delivering multiple times. When i checked in the pub-sub console. Count unacked messages kept on increasing. i am ack the messages but even then pub-sub is re-delivering Kindly help me to resolve the issue.

I am sharing the code that is used to manage pubsub


export enum PubSubTopics {
  TEST_PUB_SUB_Q = "test-pub-sub-q",
}

export const initPubSub = async () => {

  logger.info("Initializing PubSub");
  
  await initializePubSub();
  initQMap();
  
  const [topicsMeta] = await pubSub!.getTopics();
  const existingTopics = topicsMeta.map((value) => value.name.split("/")[3]);
  
  const [subscriptionsMeta] = await pubSub!.getSubscriptions();
  const existingSubscriptions = subscriptionsMeta.map(
    (value) => value.name.split("/")[3],
  );
  
  for (const topic of Object.values(PubSubTopics)) {
    if (!existingTopics.includes(topic)) {
      logger.info(`Creating topic ${topic}`);
      try {
        await pubSub!.createTopic(topic);
      } catch (e) {
        logger.error(
          `Error while creating topic ${topic}`,
          e,
          Channels.BackendAlerts,
        );
      }
    }
    
    if (!existingSubscriptions.includes(topic)) {
      try {
        await pubSub!.createSubscription(topic, topic);
      } catch (e) {
        logger.error(
          `Error while creating subscription ${topic}`,
          e,
          Channels.BackendAlerts,
        );
      }
    }
    
    const subscription = pubSub!.subscription(topic);
    subQMap.set(topic, subscription);
    const processor = qMap.get(topic);

    if (processor) {
      pubSub!.subscription(topic).on("message", async (message: Message) => {
        try {
          const messageTime = message.publishTime.toISOString();

          // Prevent old messages from being processed
          if (moment(moment()).diff(moment(messageTime), "minutes") <= 1) {
            console.log("******received-message******", subscription.name);
            
            await qMap.get(topic)!.subscribe(message);
            await pubsubLogColl.updateOne(
              { _id: message.id },
              { 
                $set: { 
                  processed: true, 
                  processedAt: moment().toDate() 
                } 
              },
            );
          }
        } catch (e) {
          console.log("Error processing message:", e);
        } finally {
          message.ack();
        }
      });
    }
  }
}
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/nodejs-pubsub API. label Jan 22, 2025
@prgwar
Copy link
Author

prgwar commented Jan 22, 2025

Image

@prgwar
Copy link
Author

prgwar commented Jan 22, 2025

Image

@prgwar
Copy link
Author

prgwar commented Jan 23, 2025

I even tried following, Still the messages are not ack

const  subClient = new v1.SubscriberClient({});
 const ackRequest = {
              subscription: `projects/${GOOGLE_PROJECT_ID}/subscriptions/${topic}`,
              ackIds: [message.ackId],
            };
      if (subClient) {
        await subClient.acknowledge(ackRequest);
        await pubsubLogColl.updateMany(
          { ackId: { $in: Array.from(ackIds) } },
          { $set: { manualAck: true } },
        );
      }

@prgwar
Copy link
Author

prgwar commented Jan 23, 2025

I think above approach is working will monitor and update. I am not sure why the other approach of calling message.ack() is not working

Image

@hongalex hongalex self-assigned this Jan 28, 2025
@hongalex
Copy link
Member

What version of the client library are you using?

Using const subClient = new v1.SubscriberClient({}); is using the lower level generated client library for acking messages. We generally prefer users to use the higher level client library rather than trying to manage acks yourself.

As for why the higher level library is having issues with acking, it's possible you aren't handling the messages properly. I looked over your code, but there are some abstractions that make it difficult to understand. Particularly, I'm not sure what await qMap.get(topic)!.subscribe(message); is handling inside your message handler.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/nodejs-pubsub API.
Projects
None yet
Development

No branches or pull requests

2 participants