-
Notifications
You must be signed in to change notification settings - Fork 215
Closed
Labels
api: pubsubIssues related to the googleapis/python-pubsub API.Issues related to the googleapis/python-pubsub API.type: questionRequest for information or clarification. Not an issue.Request for information or clarification. Not an issue.
Description
Hi,
I'm having an issue where I have a long process (up to 80 minutes) running on a Kubernetes docker instance. The instance is running Ubuntu 20.04, and I'm using Python 3.8.10. The docker container runs a python worker script, which runs a subprocess. The subprocess is multi-threaded and can use all threads during some CPU intensive tasks.
I'm getting a lot of duplicates (about 5 to 10 duplicates). This is repeatable and probably due to the intense CPU usage. What is the correct way to handle this? Thanks in advance.
My pip versions are:
google-api-core 1.30.0
google-api-python-client 2.38.0
google-auth 1.30.2
google-auth-httplib2 0.1.0
google-auth-oauthlib 0.4.4
google-cloud-core 1.7.0
google-cloud-firestore 2.3.4
google-cloud-pubsub 2.9.0
My worker code is similar to this:
import shutil
import argparse
import time
import json
import subprocess, shlex
from pathlib import Path
import concurrent.futures
from google.cloud import pubsub_v1
def run_command(cmd_title, cmd):
"""Run command string as subprocess"""
try:
cmd_list = shlex.split(cmd)
output = subprocess.run(cmd_list,check=True)
print (output)
except subprocess.CalledProcessError as e:
print ('Error running command ',str(cmd))
print (e.output)
pass
return
def worker(
project_id,
subscription):
''' receives messages from a pull subscription'''
#create subscriber
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription)
flow_control = pubsub_v1.types.FlowControl(
max_messages=1,
max_duration_per_lease_extension = 60,
max_lease_duration = 7200)
def callback(message):
print ('Received message {} of message ID {}'.format(message, message.message_id))
cmd = '......'
run_command('Long Task', cmd)
#ack message
message.ack()
print('Acknowledged message of message ID {}\n'.format(message.message_id))
future = subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control)
print('Listening for messages on {}..\n'.format(subscription_path))
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
future.result()
except:
print ('Exiting subscriber')
future.cancel()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description = 'Worker process')
parser.add_argument('-p', '--project_id',default='project_id')
parser.add_argument('-s', '--subscription', default='subscription')
args = parser.parse_args()
#run worker
worker(args.project_id, args.subscription)
Thanks in advance,
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
api: pubsubIssues related to the googleapis/python-pubsub API.Issues related to the googleapis/python-pubsub API.type: questionRequest for information or clarification. Not an issue.Request for information or clarification. Not an issue.