Skip to content

Long tasks lead to duplicate messages #593

@jtressle

Description

@jtressle

Hi,

I'm having an issue where I have a long process (up to 80 minutes) running on a Kubernetes docker instance. The instance is running Ubuntu 20.04, and I'm using Python 3.8.10. The docker container runs a python worker script, which runs a subprocess. The subprocess is multi-threaded and can use all threads during some CPU intensive tasks.

I'm getting a lot of duplicates (about 5 to 10 duplicates). This is repeatable and probably due to the intense CPU usage. What is the correct way to handle this? Thanks in advance.

My pip versions are:

google-api-core               1.30.0
google-api-python-client      2.38.0
google-auth                   1.30.2
google-auth-httplib2          0.1.0
google-auth-oauthlib          0.4.4
google-cloud-core             1.7.0
google-cloud-firestore        2.3.4
google-cloud-pubsub           2.9.0

My worker code is similar to this:

import shutil
import argparse
import time
import json
import subprocess, shlex
from pathlib import Path
import concurrent.futures
from google.cloud import pubsub_v1

def run_command(cmd_title, cmd):
	"""Run command string as subprocess"""
	try:
		cmd_list = shlex.split(cmd)
		output = subprocess.run(cmd_list,check=True)
		print (output)
	except subprocess.CalledProcessError as e:
		print ('Error running command ',str(cmd))
		print (e.output)
		pass

	return

def worker(
	project_id,
	subscription):
	''' receives messages from a pull subscription'''

	#create subscriber
	subscriber = pubsub_v1.SubscriberClient()
	subscription_path = subscriber.subscription_path(project_id, subscription)
	flow_control = pubsub_v1.types.FlowControl(
		max_messages=1, 
		max_duration_per_lease_extension = 60,
		max_lease_duration = 7200)

	def callback(message):
		print ('Received message {} of message ID {}'.format(message, message.message_id))

		cmd = '......'
		run_command('Long Task', cmd)

		#ack message
		message.ack()
		print('Acknowledged message of message ID {}\n'.format(message.message_id))

	future = subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control)
	print('Listening for messages on {}..\n'.format(subscription_path))

	# Wrap subscriber in a 'with' block to automatically call close() when done.
	with subscriber:
		try:
			# When `timeout` is not set, result() will block indefinitely,
			# unless an exception is encountered first.
			future.result()
		except:
			print ('Exiting subscriber')
			future.cancel()

if __name__ == '__main__':
	parser = argparse.ArgumentParser(description = 'Worker process')
	parser.add_argument('-p', '--project_id',default='project_id')
	
	parser.add_argument('-s', '--subscription', default='subscription')
	args = parser.parse_args()

	#run worker
	worker(args.project_id, args.subscription)

Thanks in advance,

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the googleapis/python-pubsub API.type: questionRequest for information or clarification. Not an issue.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions