001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.base.Preconditions.checkState; 020import static com.google.common.util.concurrent.Service.State.FAILED; 021import static com.google.common.util.concurrent.Service.State.NEW; 022import static com.google.common.util.concurrent.Service.State.RUNNING; 023import static com.google.common.util.concurrent.Service.State.STARTING; 024import static com.google.common.util.concurrent.Service.State.STOPPING; 025import static com.google.common.util.concurrent.Service.State.TERMINATED; 026import static java.util.Objects.requireNonNull; 027 028import com.google.common.annotations.Beta; 029import com.google.common.annotations.GwtIncompatible; 030import com.google.common.util.concurrent.Monitor.Guard; 031import com.google.common.util.concurrent.Service.State; // javadoc needs this 032import com.google.errorprone.annotations.CanIgnoreReturnValue; 033import com.google.errorprone.annotations.ForOverride; 034import com.google.errorprone.annotations.concurrent.GuardedBy; 035import com.google.j2objc.annotations.WeakOuter; 036import java.time.Duration; 037import java.util.concurrent.Executor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.TimeoutException; 040import javax.annotation.CheckForNull; 041 042/** 043 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 044 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 045 * callbacks. Its subclasses must manage threads manually; consider {@link 046 * AbstractExecutionThreadService} if you need only a single execution thread. 047 * 048 * @author Jesse Wilson 049 * @author Luke Sandberg 050 * @since 1.0 051 */ 052@GwtIncompatible 053@ElementTypesAreNonnullByDefault 054public abstract class AbstractService implements Service { 055 private static final ListenerCallQueue.Event<Listener> STARTING_EVENT = 056 new ListenerCallQueue.Event<Listener>() { 057 @Override 058 public void call(Listener listener) { 059 listener.starting(); 060 } 061 062 @Override 063 public String toString() { 064 return "starting()"; 065 } 066 }; 067 private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT = 068 new ListenerCallQueue.Event<Listener>() { 069 @Override 070 public void call(Listener listener) { 071 listener.running(); 072 } 073 074 @Override 075 public String toString() { 076 return "running()"; 077 } 078 }; 079 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT = 080 stoppingEvent(STARTING); 081 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT = 082 stoppingEvent(RUNNING); 083 084 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT = 085 terminatedEvent(NEW); 086 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT = 087 terminatedEvent(STARTING); 088 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT = 089 terminatedEvent(RUNNING); 090 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT = 091 terminatedEvent(STOPPING); 092 093 private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) { 094 return new ListenerCallQueue.Event<Listener>() { 095 @Override 096 public void call(Listener listener) { 097 listener.terminated(from); 098 } 099 100 @Override 101 public String toString() { 102 return "terminated({from = " + from + "})"; 103 } 104 }; 105 } 106 107 private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) { 108 return new ListenerCallQueue.Event<Listener>() { 109 @Override 110 public void call(Listener listener) { 111 listener.stopping(from); 112 } 113 114 @Override 115 public String toString() { 116 return "stopping({from = " + from + "})"; 117 } 118 }; 119 } 120 121 private final Monitor monitor = new Monitor(); 122 123 private final Guard isStartable = new IsStartableGuard(); 124 125 @WeakOuter 126 private final class IsStartableGuard extends Guard { 127 IsStartableGuard() { 128 super(AbstractService.this.monitor); 129 } 130 131 @Override 132 public boolean isSatisfied() { 133 return state() == NEW; 134 } 135 } 136 137 private final Guard isStoppable = new IsStoppableGuard(); 138 139 @WeakOuter 140 private final class IsStoppableGuard extends Guard { 141 IsStoppableGuard() { 142 super(AbstractService.this.monitor); 143 } 144 145 @Override 146 public boolean isSatisfied() { 147 return state().compareTo(RUNNING) <= 0; 148 } 149 } 150 151 private final Guard hasReachedRunning = new HasReachedRunningGuard(); 152 153 @WeakOuter 154 private final class HasReachedRunningGuard extends Guard { 155 HasReachedRunningGuard() { 156 super(AbstractService.this.monitor); 157 } 158 159 @Override 160 public boolean isSatisfied() { 161 return state().compareTo(RUNNING) >= 0; 162 } 163 } 164 165 private final Guard isStopped = new IsStoppedGuard(); 166 167 @WeakOuter 168 private final class IsStoppedGuard extends Guard { 169 IsStoppedGuard() { 170 super(AbstractService.this.monitor); 171 } 172 173 @Override 174 public boolean isSatisfied() { 175 return state().compareTo(TERMINATED) >= 0; 176 } 177 } 178 179 /** The listeners to notify during a state transition. */ 180 private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>(); 181 182 /** 183 * The current state of the service. This should be written with the lock held but can be read 184 * without it because it is an immutable object in a volatile field. This is desirable so that 185 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 186 * without grabbing the lock. 187 * 188 * <p>To update this field correctly the lock must be held to guarantee that the state is 189 * consistent. 190 */ 191 private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 192 193 /** Constructor for use by subclasses. */ 194 protected AbstractService() {} 195 196 /** 197 * This method is called by {@link #startAsync} to initiate service startup. The invocation of 198 * this method should cause a call to {@link #notifyStarted()}, either during this method's run, 199 * or after it has returned. If startup fails, the invocation should cause a call to {@link 200 * #notifyFailed(Throwable)} instead. 201 * 202 * <p>This method should return promptly; prefer to do work on a different thread where it is 203 * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 204 * called multiple times. 205 */ 206 @ForOverride 207 protected abstract void doStart(); 208 209 /** 210 * This method should be used to initiate service shutdown. The invocation of this method should 211 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 212 * returned. If shutdown fails, the invocation should cause a call to {@link 213 * #notifyFailed(Throwable)} instead. 214 * 215 * <p>This method should return promptly; prefer to do work on a different thread where it is 216 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 217 * called multiple times. 218 * 219 * <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not 220 * invoked immediately. Instead, it will be deferred until after the service is {@link 221 * State#RUNNING}. Services that need to cancel startup work can override {@link #doCancelStart}. 222 */ 223 @ForOverride 224 protected abstract void doStop(); 225 226 /** 227 * This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link 228 * #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the 229 * method to cancel pending work and then call {@link #notifyStopped} to stop the service. 230 * 231 * <p>This method should return promptly; prefer to do work on a different thread where it is 232 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 233 * called multiple times. 234 * 235 * <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which is the 236 * external state observable by the caller of {@link #stopAsync}. 237 * 238 * @since 27.0 239 */ 240 @Beta 241 @ForOverride 242 protected void doCancelStart() {} 243 244 @CanIgnoreReturnValue 245 @Override 246 public final Service startAsync() { 247 if (monitor.enterIf(isStartable)) { 248 try { 249 snapshot = new StateSnapshot(STARTING); 250 enqueueStartingEvent(); 251 doStart(); 252 } catch (Throwable startupFailure) { 253 notifyFailed(startupFailure); 254 } finally { 255 monitor.leave(); 256 dispatchListenerEvents(); 257 } 258 } else { 259 throw new IllegalStateException("Service " + this + " has already been started"); 260 } 261 return this; 262 } 263 264 @CanIgnoreReturnValue 265 @Override 266 public final Service stopAsync() { 267 if (monitor.enterIf(isStoppable)) { 268 try { 269 State previous = state(); 270 switch (previous) { 271 case NEW: 272 snapshot = new StateSnapshot(TERMINATED); 273 enqueueTerminatedEvent(NEW); 274 break; 275 case STARTING: 276 snapshot = new StateSnapshot(STARTING, true, null); 277 enqueueStoppingEvent(STARTING); 278 doCancelStart(); 279 break; 280 case RUNNING: 281 snapshot = new StateSnapshot(STOPPING); 282 enqueueStoppingEvent(RUNNING); 283 doStop(); 284 break; 285 case STOPPING: 286 case TERMINATED: 287 case FAILED: 288 // These cases are impossible due to the if statement above. 289 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 290 } 291 } catch (Throwable shutdownFailure) { 292 notifyFailed(shutdownFailure); 293 } finally { 294 monitor.leave(); 295 dispatchListenerEvents(); 296 } 297 } 298 return this; 299 } 300 301 @Override 302 public final void awaitRunning() { 303 monitor.enterWhenUninterruptibly(hasReachedRunning); 304 try { 305 checkCurrentState(RUNNING); 306 } finally { 307 monitor.leave(); 308 } 309 } 310 311 /** @since 28.0 */ 312 @Override 313 public final void awaitRunning(Duration timeout) throws TimeoutException { 314 Service.super.awaitRunning(timeout); 315 } 316 317 @Override 318 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 319 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 320 try { 321 checkCurrentState(RUNNING); 322 } finally { 323 monitor.leave(); 324 } 325 } else { 326 // It is possible due to races the we are currently in the expected state even though we 327 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 328 // even check the guard. I don't think we care too much about this use case but it could lead 329 // to a confusing error message. 330 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state."); 331 } 332 } 333 334 @Override 335 public final void awaitTerminated() { 336 monitor.enterWhenUninterruptibly(isStopped); 337 try { 338 checkCurrentState(TERMINATED); 339 } finally { 340 monitor.leave(); 341 } 342 } 343 344 /** @since 28.0 */ 345 @Override 346 public final void awaitTerminated(Duration timeout) throws TimeoutException { 347 Service.super.awaitTerminated(timeout); 348 } 349 350 @Override 351 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 352 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 353 try { 354 checkCurrentState(TERMINATED); 355 } finally { 356 monitor.leave(); 357 } 358 } else { 359 // It is possible due to races the we are currently in the expected state even though we 360 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 361 // even check the guard. I don't think we care too much about this use case but it could lead 362 // to a confusing error message. 363 throw new TimeoutException( 364 "Timed out waiting for " 365 + this 366 + " to reach a terminal state. " 367 + "Current state: " 368 + state()); 369 } 370 } 371 372 /** Checks that the current state is equal to the expected state. */ 373 @GuardedBy("monitor") 374 private void checkCurrentState(State expected) { 375 State actual = state(); 376 if (actual != expected) { 377 if (actual == FAILED) { 378 // Handle this specially so that we can include the failureCause, if there is one. 379 throw new IllegalStateException( 380 "Expected the service " + this + " to be " + expected + ", but the service has FAILED", 381 failureCause()); 382 } 383 throw new IllegalStateException( 384 "Expected the service " + this + " to be " + expected + ", but was " + actual); 385 } 386 } 387 388 /** 389 * Implementing classes should invoke this method once their service has started. It will cause 390 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 391 * 392 * @throws IllegalStateException if the service is not {@link State#STARTING}. 393 */ 394 protected final void notifyStarted() { 395 monitor.enter(); 396 try { 397 // We have to examine the internal state of the snapshot here to properly handle the stop 398 // while starting case. 399 if (snapshot.state != STARTING) { 400 IllegalStateException failure = 401 new IllegalStateException( 402 "Cannot notifyStarted() when the service is " + snapshot.state); 403 notifyFailed(failure); 404 throw failure; 405 } 406 407 if (snapshot.shutdownWhenStartupFinishes) { 408 snapshot = new StateSnapshot(STOPPING); 409 // We don't call listeners here because we already did that when we set the 410 // shutdownWhenStartupFinishes flag. 411 doStop(); 412 } else { 413 snapshot = new StateSnapshot(RUNNING); 414 enqueueRunningEvent(); 415 } 416 } finally { 417 monitor.leave(); 418 dispatchListenerEvents(); 419 } 420 } 421 422 /** 423 * Implementing classes should invoke this method once their service has stopped. It will cause 424 * the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link 425 * State#TERMINATED}. 426 * 427 * @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link 428 * State#STARTING}, or {@link State#RUNNING}. 429 */ 430 protected final void notifyStopped() { 431 monitor.enter(); 432 try { 433 State previous = state(); 434 switch (previous) { 435 case NEW: 436 case TERMINATED: 437 case FAILED: 438 throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous); 439 case RUNNING: 440 case STARTING: 441 case STOPPING: 442 snapshot = new StateSnapshot(TERMINATED); 443 enqueueTerminatedEvent(previous); 444 break; 445 } 446 } finally { 447 monitor.leave(); 448 dispatchListenerEvents(); 449 } 450 } 451 452 /** 453 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 454 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 455 * or otherwise cannot be started nor stopped. 456 */ 457 protected final void notifyFailed(Throwable cause) { 458 checkNotNull(cause); 459 460 monitor.enter(); 461 try { 462 State previous = state(); 463 switch (previous) { 464 case NEW: 465 case TERMINATED: 466 throw new IllegalStateException("Failed while in state:" + previous, cause); 467 case RUNNING: 468 case STARTING: 469 case STOPPING: 470 snapshot = new StateSnapshot(FAILED, false, cause); 471 enqueueFailedEvent(previous, cause); 472 break; 473 case FAILED: 474 // Do nothing 475 break; 476 } 477 } finally { 478 monitor.leave(); 479 dispatchListenerEvents(); 480 } 481 } 482 483 @Override 484 public final boolean isRunning() { 485 return state() == RUNNING; 486 } 487 488 @Override 489 public final State state() { 490 return snapshot.externalState(); 491 } 492 493 /** @since 14.0 */ 494 @Override 495 public final Throwable failureCause() { 496 return snapshot.failureCause(); 497 } 498 499 /** @since 13.0 */ 500 @Override 501 public final void addListener(Listener listener, Executor executor) { 502 listeners.addListener(listener, executor); 503 } 504 505 @Override 506 public String toString() { 507 return getClass().getSimpleName() + " [" + state() + "]"; 508 } 509 510 /** 511 * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link 512 * #monitor}. 513 */ 514 private void dispatchListenerEvents() { 515 if (!monitor.isOccupiedByCurrentThread()) { 516 listeners.dispatch(); 517 } 518 } 519 520 private void enqueueStartingEvent() { 521 listeners.enqueue(STARTING_EVENT); 522 } 523 524 private void enqueueRunningEvent() { 525 listeners.enqueue(RUNNING_EVENT); 526 } 527 528 private void enqueueStoppingEvent(final State from) { 529 if (from == State.STARTING) { 530 listeners.enqueue(STOPPING_FROM_STARTING_EVENT); 531 } else if (from == State.RUNNING) { 532 listeners.enqueue(STOPPING_FROM_RUNNING_EVENT); 533 } else { 534 throw new AssertionError(); 535 } 536 } 537 538 private void enqueueTerminatedEvent(final State from) { 539 switch (from) { 540 case NEW: 541 listeners.enqueue(TERMINATED_FROM_NEW_EVENT); 542 break; 543 case STARTING: 544 listeners.enqueue(TERMINATED_FROM_STARTING_EVENT); 545 break; 546 case RUNNING: 547 listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT); 548 break; 549 case STOPPING: 550 listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT); 551 break; 552 case TERMINATED: 553 case FAILED: 554 throw new AssertionError(); 555 } 556 } 557 558 private void enqueueFailedEvent(final State from, final Throwable cause) { 559 // can't memoize this one due to the exception 560 listeners.enqueue( 561 new ListenerCallQueue.Event<Listener>() { 562 @Override 563 public void call(Listener listener) { 564 listener.failed(from, cause); 565 } 566 567 @Override 568 public String toString() { 569 return "failed({from = " + from + ", cause = " + cause + "})"; 570 } 571 }); 572 } 573 574 /** 575 * An immutable snapshot of the current state of the service. This class represents a consistent 576 * snapshot of the state and therefore it can be used to answer simple queries without needing to 577 * grab a lock. 578 */ 579 // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses). 580 private static final class StateSnapshot { 581 /** 582 * The internal state, which equals external state unless shutdownWhenStartupFinishes is true. 583 */ 584 final State state; 585 586 /** If true, the user requested a shutdown while the service was still starting up. */ 587 final boolean shutdownWhenStartupFinishes; 588 589 /** 590 * The exception that caused this service to fail. This will be {@code null} unless the service 591 * has failed. 592 */ 593 @CheckForNull final Throwable failure; 594 595 StateSnapshot(State internalState) { 596 this(internalState, false, null); 597 } 598 599 StateSnapshot( 600 State internalState, boolean shutdownWhenStartupFinishes, @CheckForNull Throwable failure) { 601 checkArgument( 602 !shutdownWhenStartupFinishes || internalState == STARTING, 603 "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 604 internalState); 605 checkArgument( 606 (failure != null) == (internalState == FAILED), 607 "A failure cause should be set if and only if the state is failed. Got %s and %s " 608 + "instead.", 609 internalState, 610 failure); 611 this.state = internalState; 612 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 613 this.failure = failure; 614 } 615 616 /** @see Service#state() */ 617 State externalState() { 618 if (shutdownWhenStartupFinishes && state == STARTING) { 619 return STOPPING; 620 } else { 621 return state; 622 } 623 } 624 625 /** @see Service#failureCause() */ 626 Throwable failureCause() { 627 checkState( 628 state == FAILED, 629 "failureCause() is only valid if the service has failed, service is %s", 630 state); 631 // requireNonNull is safe because the constructor requires a non-null cause with state=FAILED. 632 return requireNonNull(failure); 633 } 634 } 635}