Gracefully shut down exporter after it exports the remaining metrics (#2936)
* [BugFix]shut down exporter after it exports the remaining metrics * Remove testing log * Use CompletableResultCode to manage metrics forceflush * fix bad format * Adjust the logic to never return null from forceFlush() * Add doRun() method for IntervalMetricReader.Exporter and return CompletableResultCode * refine code to remove duplicate code
This commit is contained in:
		
							parent
							
								
									23ce8fe392
								
							
						
					
					
						commit
						906c0e82df
					
				| 
						 | 
				
			
			@ -33,19 +33,30 @@ public final class IntervalMetricReader {
 | 
			
		|||
  private final ScheduledExecutorService scheduler;
 | 
			
		||||
 | 
			
		||||
  /** Stops the scheduled task and calls export one more time. */
 | 
			
		||||
  public void shutdown() {
 | 
			
		||||
  public CompletableResultCode shutdown() {
 | 
			
		||||
    final CompletableResultCode result = new CompletableResultCode();
 | 
			
		||||
    scheduler.shutdown();
 | 
			
		||||
    try {
 | 
			
		||||
      scheduler.awaitTermination(5, TimeUnit.SECONDS);
 | 
			
		||||
      exporter.run();
 | 
			
		||||
      final CompletableResultCode flushResult = exporter.doRun();
 | 
			
		||||
      flushResult.join(5, TimeUnit.SECONDS);
 | 
			
		||||
    } catch (InterruptedException e) {
 | 
			
		||||
      // force a shutdown if the export hasn't finished.
 | 
			
		||||
      scheduler.shutdownNow();
 | 
			
		||||
      // reset the interrupted status
 | 
			
		||||
      Thread.currentThread().interrupt();
 | 
			
		||||
    } finally {
 | 
			
		||||
      exporter.shutdown();
 | 
			
		||||
      final CompletableResultCode shutdownResult = exporter.shutdown();
 | 
			
		||||
      shutdownResult.whenComplete(
 | 
			
		||||
          () -> {
 | 
			
		||||
            if (!shutdownResult.isSuccess()) {
 | 
			
		||||
              result.fail();
 | 
			
		||||
            } else {
 | 
			
		||||
              result.succeed();
 | 
			
		||||
            }
 | 
			
		||||
          });
 | 
			
		||||
    }
 | 
			
		||||
    return result;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
| 
						 | 
				
			
			@ -81,6 +92,12 @@ public final class IntervalMetricReader {
 | 
			
		|||
    @Override
 | 
			
		||||
    @SuppressWarnings("BooleanParameter")
 | 
			
		||||
    public void run() {
 | 
			
		||||
      // Ignore the CompletableResultCode from doRun() in order to keep run() asynchronous
 | 
			
		||||
      doRun();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    CompletableResultCode doRun() {
 | 
			
		||||
      final CompletableResultCode flushResult = new CompletableResultCode();
 | 
			
		||||
      if (exportAvailable.compareAndSet(true, false)) {
 | 
			
		||||
        try {
 | 
			
		||||
          List<MetricData> metricsList = new ArrayList<>();
 | 
			
		||||
| 
						 | 
				
			
			@ -94,18 +111,22 @@ public final class IntervalMetricReader {
 | 
			
		|||
                if (!result.isSuccess()) {
 | 
			
		||||
                  logger.log(Level.FINE, "Exporter failed");
 | 
			
		||||
                }
 | 
			
		||||
                flushResult.succeed();
 | 
			
		||||
                exportAvailable.set(true);
 | 
			
		||||
              });
 | 
			
		||||
        } catch (RuntimeException e) {
 | 
			
		||||
          logger.log(Level.WARNING, "Exporter threw an Exception", e);
 | 
			
		||||
          flushResult.fail();
 | 
			
		||||
        }
 | 
			
		||||
      } else {
 | 
			
		||||
        logger.log(Level.FINE, "Exporter busy. Dropping metrics.");
 | 
			
		||||
        flushResult.fail();
 | 
			
		||||
      }
 | 
			
		||||
      return flushResult;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void shutdown() {
 | 
			
		||||
      internalState.getMetricExporter().shutdown();
 | 
			
		||||
    CompletableResultCode shutdown() {
 | 
			
		||||
      return internalState.getMetricExporter().shutdown();
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue