Skip to content

Commit 0bc2f01

Browse files
amanatgithubAman Pandey
andauthored
Graceful shutdown FileCopyThread in FCH (#3083)
* Graceful shutdown FileCopyThread in FCH * Graceful shutdown FileCopyThread in FCH - Patch(2) * Graceful shutdown FileCopyThread in FCH - Patch(3) --------- Co-authored-by: Aman Pandey <[email protected]>
1 parent e30f2fa commit 0bc2f01

File tree

4 files changed

+20
-3
lines changed

4 files changed

+20
-3
lines changed

ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/DiskAwareFileCopyThreadPoolManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,18 @@ boolean areThreadsRunning() {
182182
@Override
183183
public void shutdown() throws InterruptedException {
184184
isRunning = false;
185+
logger.info("Shutting down DiskAwareFileCopyThreadPoolManager");
186+
threadQueueLock.lock();
187+
runningThreads.forEach((diskId, fileCopyThreads) -> {
188+
fileCopyThreads.forEach(fileCopyThread -> {
189+
try {
190+
fileCopyThread.shutDown();
191+
} catch (Exception e) {
192+
logger.error("Error while shutting down thread {}", fileCopyThread.threadName, e);
193+
}
194+
});
195+
});
196+
threadQueueLock.unlock();
185197
shutdownLatch.await();
186198
}
187199
}

ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyBasedReplicationSchedulerImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class FileCopyBasedReplicationSchedulerImpl implements FileCopyBasedReplicationS
4545
private final FileCopyHandlerFactory fileCopyHandlerFactory;
4646
private final ClusterMap clusterMap;
4747
private final FileCopyBasedReplicationThreadPoolManager fileCopyBasedReplicationThreadPoolManager;
48+
private final Thread fileCopyBasedReplicationThreadPoolManagerThread;
4849
private final Map<ReplicaId, Long> replicaToStartTimeMap;
4950

5051
private final Map<ReplicaId, FileCopyStatusListener> replicaToStatusListenerMap;
@@ -77,6 +78,7 @@ public FileCopyBasedReplicationSchedulerImpl(@Nonnull FileCopyHandlerFactory fil
7778
this.clusterMap = clusterMap;
7879
this.fileCopyBasedReplicationThreadPoolManager = new DiskAwareFileCopyThreadPoolManager(dataNodeId.getDiskIds(),
7980
fileCopyBasedReplicationConfig.fileCopyNumberOfFileCopyThreads);
81+
this.fileCopyBasedReplicationThreadPoolManagerThread = new Thread(fileCopyBasedReplicationThreadPoolManager);
8082
this.replicaToStartTimeMap = new ConcurrentHashMap<>();
8183
this.inFlightReplicas = new LinkedList<>();
8284
this.prioritizationManager = prioritizationManager;
@@ -90,6 +92,7 @@ public void run(){
9092
isRunning = true;
9193
logger.info("FileCopyBasedReplicationSchedulerImpl Started");
9294
try {
95+
fileCopyBasedReplicationThreadPoolManagerThread.start();
9396
scheduleFileCopy();
9497
} catch (InterruptedException e) {
9598
logger.error("Failed to start FileCopy Scheduler", e);
@@ -128,8 +131,11 @@ List<ReplicaId> getNextReplicaToHydrate(DiskId diskId, int numberOfReplicasOnDis
128131

129132
@Override
130133
public void shutdown() throws InterruptedException {
134+
logger.info("Shutting down FileCopyBasedReplicationSchedulerImpl");
131135
isRunning = false;
132136
fileCopyBasedReplicationThreadPoolManager.shutdown();
137+
fileCopyBasedReplicationThreadPoolManagerThread.join();
138+
logger.info("FileCopyBasedReplicationSchedulerImpl shutdown");
133139
}
134140

135141
@Override

ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyBasedReplicationThreadPoolManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
* assignments of partitions for hydration to threads in the thread pool.
2626
*/
2727

28-
public interface FileCopyBasedReplicationThreadPoolManager {
28+
public interface FileCopyBasedReplicationThreadPoolManager extends Runnable {
2929

3030
/**
3131
* @return the number thread pool size.

ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/handler/StoreFileCopyHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,9 @@ void stop() {
118118
}
119119

120120
/**
121-
* Shutdown the file copy handler. Perform clean up steps in case of a graceful shutdown.
121+
* Shutdown the file copy handler.
122122
*/
123123
public void shutdown() {
124-
connectionPool.shutdown();
125124
isRunning = false;
126125
}
127126

0 commit comments

Comments
 (0)