mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	zmirror: Add backoff-retry code for the Zephyr=>Humbug initialization.
(imported from commit bde30d87d6dae75bbfdcd49b30a1f08b3c585ec7)
This commit is contained in:
		@@ -34,6 +34,37 @@ import signal
 | 
				
			|||||||
import logging
 | 
					import logging
 | 
				
			||||||
import hashlib
 | 
					import hashlib
 | 
				
			||||||
import tempfile
 | 
					import tempfile
 | 
				
			||||||
 | 
					import random
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class CountingBackoff(object):
 | 
				
			||||||
 | 
					    def __init__(self, maximum_retries=10):
 | 
				
			||||||
 | 
					        self.number_of_retries = 0
 | 
				
			||||||
 | 
					        self.maximum_retries = maximum_retries
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def keep_going(self):
 | 
				
			||||||
 | 
					        return self.number_of_retries < self.maximum_retries
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def succeed(self):
 | 
				
			||||||
 | 
					        self.number_of_retries = 0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def fail(self):
 | 
				
			||||||
 | 
					        self.number_of_retries = min(self.number_of_retries + 1,
 | 
				
			||||||
 | 
					                                     self.maximum_retries)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class RandomExponentialBackoff(CountingBackoff):
 | 
				
			||||||
 | 
					    def fail(self):
 | 
				
			||||||
 | 
					        self.number_of_retries = min(self.number_of_retries + 1,
 | 
				
			||||||
 | 
					                                     self.maximum_retries)
 | 
				
			||||||
 | 
					        # Exponential growth with ratio sqrt(2); compute random delay
 | 
				
			||||||
 | 
					        # between x and 2x where x is growing exponentially
 | 
				
			||||||
 | 
					        delay_scale = int(2 ** (self.number_of_retries / 2.0 - 1)) + 1
 | 
				
			||||||
 | 
					        delay = delay_scale + random.randint(1, delay_scale)
 | 
				
			||||||
 | 
					        message = "Sleeping for %ss [max %s] before retrying." % (delay, delay_scale * 2)
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            logger.warning(message)
 | 
				
			||||||
 | 
					        except NameError:
 | 
				
			||||||
 | 
					            print message
 | 
				
			||||||
 | 
					        time.sleep(delay)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
DEFAULT_SITE = "https://humbughq.com"
 | 
					DEFAULT_SITE = "https://humbughq.com"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -199,18 +230,21 @@ def update_subscriptions():
 | 
				
			|||||||
    if len(classes_to_subscribe) > 0:
 | 
					    if len(classes_to_subscribe) > 0:
 | 
				
			||||||
        zephyr_bulk_subscribe(list(classes_to_subscribe))
 | 
					        zephyr_bulk_subscribe(list(classes_to_subscribe))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def maybe_kill_child():
 | 
				
			||||||
 | 
					    try:
 | 
				
			||||||
 | 
					        if child_pid is not None:
 | 
				
			||||||
 | 
					            os.kill(child_pid, signal.SIGTERM)
 | 
				
			||||||
 | 
					    except OSError:
 | 
				
			||||||
 | 
					        # We don't care if the child process no longer exists, so just log the error
 | 
				
			||||||
 | 
					        logger.exception("")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def maybe_restart_mirroring_script():
 | 
					def maybe_restart_mirroring_script():
 | 
				
			||||||
    if os.stat(os.path.join(options.root_path, "stamps", "restart_stamp")).st_mtime > start_time or \
 | 
					    if os.stat(os.path.join(options.root_path, "stamps", "restart_stamp")).st_mtime > start_time or \
 | 
				
			||||||
            ((options.user == "tabbott" or options.user == "tabbott/extra") and
 | 
					            ((options.user == "tabbott" or options.user == "tabbott/extra") and
 | 
				
			||||||
             os.stat(os.path.join(options.root_path, "stamps", "tabbott_stamp")).st_mtime > start_time):
 | 
					             os.stat(os.path.join(options.root_path, "stamps", "tabbott_stamp")).st_mtime > start_time):
 | 
				
			||||||
        logger.warning("")
 | 
					        logger.warning("")
 | 
				
			||||||
        logger.warning("zephyr mirroring script has been updated; restarting...")
 | 
					        logger.warning("zephyr mirroring script has been updated; restarting...")
 | 
				
			||||||
        try:
 | 
					        maybe_kill_child()
 | 
				
			||||||
            if child_pid is not None:
 | 
					 | 
				
			||||||
                os.kill(child_pid, signal.SIGTERM)
 | 
					 | 
				
			||||||
        except OSError:
 | 
					 | 
				
			||||||
            # We don't care if the child process no longer exists, so just log the error
 | 
					 | 
				
			||||||
            logger.exception("")
 | 
					 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            zephyr._z.cancelSubs()
 | 
					            zephyr._z.cancelSubs()
 | 
				
			||||||
        except IOError:
 | 
					        except IOError:
 | 
				
			||||||
@@ -358,18 +392,43 @@ def decode_unicode_byte_strings(zeph):
 | 
				
			|||||||
            zeph[field] = decoded
 | 
					            zeph[field] = decoded
 | 
				
			||||||
    return zeph
 | 
					    return zeph
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def quit_failed_initialization(message):
 | 
				
			||||||
 | 
					    logger.error(message)
 | 
				
			||||||
 | 
					    maybe_kill_child()
 | 
				
			||||||
 | 
					    sys.exit(1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def zephyr_init_autoretry():
 | 
				
			||||||
 | 
					    backoff = RandomExponentialBackoff()
 | 
				
			||||||
 | 
					    while backoff.keep_going():
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            # zephyr.init() tries to clear old subscriptions, and thus
 | 
				
			||||||
 | 
					            # sometimes gets a SERVNAK from the server
 | 
				
			||||||
 | 
					            zephyr.init()
 | 
				
			||||||
 | 
					            backoff.succeed()
 | 
				
			||||||
 | 
					            return
 | 
				
			||||||
 | 
					        except IOError:
 | 
				
			||||||
 | 
					            logger.exception("Error initializing Zephyr library (retrying).  Traceback:")
 | 
				
			||||||
 | 
					            backoff.fail()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    quit_failed_initialization("Could not initialize Zephyr library, quitting!")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def zephyr_subscribe_autoretry(sub):
 | 
					def zephyr_subscribe_autoretry(sub):
 | 
				
			||||||
    while True:
 | 
					    backoff = RandomExponentialBackoff()
 | 
				
			||||||
 | 
					    while backoff.keep_going():
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            zephyr.Subscriptions().add(sub)
 | 
					            zephyr.Subscriptions().add(sub)
 | 
				
			||||||
 | 
					            backoff.succeed()
 | 
				
			||||||
            return
 | 
					            return
 | 
				
			||||||
        except IOError:
 | 
					        except IOError:
 | 
				
			||||||
            # Probably a SERVNAK from the zephyr server, but log the
 | 
					            # Probably a SERVNAK from the zephyr server, but log the
 | 
				
			||||||
            # traceback just in case it's something else
 | 
					            # traceback just in case it's something else
 | 
				
			||||||
            logger.exception("Error subscribing to personals (retrying).  Traceback:")
 | 
					            logger.exception("Error subscribing to personals (retrying).  Traceback:")
 | 
				
			||||||
            time.sleep(1)
 | 
					            backoff.fail()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    quit_failed_initialization("Could not subscribe to personals, quitting!")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def zephyr_to_humbug(options):
 | 
					def zephyr_to_humbug(options):
 | 
				
			||||||
 | 
					    zephyr_init_autoretry()
 | 
				
			||||||
    if options.forward_class_messages:
 | 
					    if options.forward_class_messages:
 | 
				
			||||||
        update_subscriptions()
 | 
					        update_subscriptions()
 | 
				
			||||||
    if options.forward_personals:
 | 
					    if options.forward_personals:
 | 
				
			||||||
@@ -404,7 +463,7 @@ def zephyr_to_humbug(options):
 | 
				
			|||||||
                    logger.exception("Could not send saved zephyr:")
 | 
					                    logger.exception("Could not send saved zephyr:")
 | 
				
			||||||
                    time.sleep(2)
 | 
					                    time.sleep(2)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    logger.info("Starting receive loop.")
 | 
					    logger.info("Successfully initialized; Starting receive loop.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if options.log_path is not None:
 | 
					    if options.log_path is not None:
 | 
				
			||||||
        with open(options.log_path, 'a') as log:
 | 
					        with open(options.log_path, 'a') as log:
 | 
				
			||||||
@@ -898,15 +957,6 @@ or specify the --api-key-file option.""" % (options.api_key_file,))))
 | 
				
			|||||||
    CURRENT_STATE = States.ZephyrToHumbug
 | 
					    CURRENT_STATE = States.ZephyrToHumbug
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    import zephyr
 | 
					    import zephyr
 | 
				
			||||||
    while True:
 | 
					 | 
				
			||||||
        try:
 | 
					 | 
				
			||||||
            # zephyr.init() tries to clear old subscriptions, and thus
 | 
					 | 
				
			||||||
            # sometimes gets a SERVNAK from the server
 | 
					 | 
				
			||||||
            zephyr.init()
 | 
					 | 
				
			||||||
            break
 | 
					 | 
				
			||||||
        except IOError:
 | 
					 | 
				
			||||||
            logger.exception("")
 | 
					 | 
				
			||||||
            time.sleep(1)
 | 
					 | 
				
			||||||
    logger_name = "zephyr=>humbug"
 | 
					    logger_name = "zephyr=>humbug"
 | 
				
			||||||
    if options.shard is not None:
 | 
					    if options.shard is not None:
 | 
				
			||||||
        logger_name += "(%s)" % (options.shard,)
 | 
					        logger_name += "(%s)" % (options.shard,)
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user