From fea615f5ab72a91e62eca264a54b68e02ecba5dd Mon Sep 17 00:00:00 2001 From: Tigran Mkrtchyan Date: Fri, 29 Dec 2023 13:56:17 +0100 Subject: [PATCH] pool: re-introduce limit on number of concurrent p2p transfers Motivation: During massive data migration from multiple sources to a single pool the later one gets IO starvation, thus all p2p transfers get stalled. The majority of such transfers get killed by jtm. The migration get cancelled or suspended, and restarts doesn't help as the behavior repeats itself. Modification: Introduce a semaphore to control the concurrency. The default value is unlimited (Integer.MAX_VALUE). The pool's `pp set max active` command is re-introduced (undeprecated). Result: never ending data migration (canceled after 3 weeks) completed within 16h. Acked-by: Lea Morschel Target: master Require-book: no Require-notes: yes --- .../java/org/dcache/pool/p2p/Companion.java | 26 ++++++++++++-- .../java/org/dcache/pool/p2p/P2PClient.java | 35 ++++++++++++++++--- .../org/dcache/pool/p2p/json/P2PData.java | 11 ++++++ .../org/dcache/util/AdjustableSemaphore.java | 4 +++ 4 files changed, 68 insertions(+), 8 deletions(-) diff --git a/modules/dcache/src/main/java/org/dcache/pool/p2p/Companion.java b/modules/dcache/src/main/java/org/dcache/pool/p2p/Companion.java index 7bd35e85174..ad341e947fc 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/p2p/Companion.java +++ b/modules/dcache/src/main/java/org/dcache/pool/p2p/Companion.java @@ -63,6 +63,7 @@ import org.dcache.pool.repository.Repository; import org.dcache.pool.repository.RepositoryChannel; import org.dcache.pool.repository.StickyRecord; +import org.dcache.util.AdjustableSemaphore; import org.dcache.util.Checksum; import org.dcache.util.FireAndForgetTask; import org.dcache.util.Version; @@ -155,6 +156,11 @@ class Companion { private SSLContext _sslContext; + /** + * Semaphore to limit the number of concurrent pool-to-pool transfers. + */ + private final AdjustableSemaphore _concurrency; + /** * Creates a new instance. * @@ -189,7 +195,8 @@ class Companion { CacheFileAvailable callback, boolean forceSourceMode, Long atime, - SSLContext sslContext) { + SSLContext sslContext, + AdjustableSemaphore concurrency) { _fsm = new CompanionContext(this); _executor = executor; @@ -222,6 +229,7 @@ class Companion { _stickyRecords = new ArrayList<>(stickyRecords); _id = _nextId.getAndIncrement(); + _concurrency = concurrency; synchronized (this) { _fsm.start(); @@ -564,8 +572,20 @@ void beginTransfer(final String uri) { new Thread("P2P Transfer - " + _pnfsId + " " + _sourcePoolName) { @Override public void run() { - cdc.restore(); - transfer(uri); + + boolean release = false; + try { + _concurrency.acquire(); + release = true; + cdc.restore(); + transfer(uri); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + if(release) { + _concurrency.release(); + } + } } }.start(); } diff --git a/modules/dcache/src/main/java/org/dcache/pool/p2p/P2PClient.java b/modules/dcache/src/main/java/org/dcache/pool/p2p/P2PClient.java index 881787509d7..abee74e28d5 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/p2p/P2PClient.java +++ b/modules/dcache/src/main/java/org/dcache/pool/p2p/P2PClient.java @@ -36,6 +36,7 @@ import org.dcache.pool.repository.ReplicaState; import org.dcache.pool.repository.Repository; import org.dcache.pool.repository.StickyRecord; +import org.dcache.util.AdjustableSemaphore; import org.dcache.vehicles.FileAttributes; enum TlsMode { @@ -62,6 +63,11 @@ public class P2PClient private Callable _sslContext; + /** + * Semaphore to limit the number of concurrent pool-to-pool transfers. + */ + private final AdjustableSemaphore _concurrency = new AdjustableSemaphore(Integer.MAX_VALUE); + public synchronized void setExecutor(ScheduledExecutorService executor) { _executor = executor; } @@ -83,7 +89,7 @@ public synchronized void setPool(CellStub pool) { } public synchronized int getActiveJobs() { - return _companions.size(); + return _concurrency.getUsedPermits(); } public synchronized void setSslContext(Callable sslContext) { @@ -256,7 +262,8 @@ public synchronized int newCompanion(String sourcePoolName, targetState, stickyRecords, cb, forceSourceMode, atime, - context + context, + _concurrency ); int id = addCompanion(companion); @@ -296,6 +303,7 @@ public synchronized P2PData getDataObject() { P2PData info = new P2PData(); info.setLabel("Pool to Pool"); info.setPpInterface(_interface); + info.setMaxActive(_concurrency.getMaxPermits()); return info; } @@ -305,6 +313,9 @@ public synchronized void printSetup(PrintWriter pw) { if (_interface != null) { pw.println("pp interface " + _interface.getHostAddress()); } + if (_concurrency.getMaxPermits() != Integer.MAX_VALUE) { + pw.println("pp set max active " + _concurrency.getMaxPermits()); + } } @Command(name = "pp set pnfs timeout", @@ -321,15 +332,29 @@ public String call() { } } - @Command(name = "pp set max active") - @Deprecated + @AffectsSetup + @Command(name = "pp set max active", + hint = "set the maximum number of active pool-to-pool client transfers", + description = "Set the maximum number of active pool-to-pool " + + "(client) concurrent transfers allowed. Any further " + + "requests will be queued. The default is unlimited.") public class PpSetMaxActiveCommand implements Callable { - @Argument + @Argument(usage = "Specify the maximum number of active pool-to-pool " + + "client transfers. The negative value means unlimited.", metaVar = "max active") int maxActiveAllowed; @Override public String call() throws IllegalArgumentException { + if (maxActiveAllowed == 0) { + throw new IllegalArgumentException("Max active must be greater than 0"); + } + + if (maxActiveAllowed < 0) { + maxActiveAllowed = Integer.MAX_VALUE; + } + + _concurrency.setMaxPermits(maxActiveAllowed); return ""; } } diff --git a/modules/dcache/src/main/java/org/dcache/pool/p2p/json/P2PData.java b/modules/dcache/src/main/java/org/dcache/pool/p2p/json/P2PData.java index b008a08b179..333fb09cecc 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/p2p/json/P2PData.java +++ b/modules/dcache/src/main/java/org/dcache/pool/p2p/json/P2PData.java @@ -73,6 +73,8 @@ public class P2PData implements Serializable { private String label; private InetAddress ppInterface; + private int maxActive; + public String getLabel() { return label; } @@ -85,6 +87,11 @@ public void print(PrintWriter pw) { if (ppInterface != null) { pw.println(" Interface : " + ppInterface); } + if (maxActive == Integer.MAX_VALUE) { + pw.println(" MaxActive : unlimited"); + } else { + pw.println(" MaxActive : " + maxActive); + } } public void setLabel(String label) { @@ -94,4 +101,8 @@ public void setLabel(String label) { public void setPpInterface(InetAddress ppInterface) { this.ppInterface = ppInterface; } + + public void setMaxActive(int maxActive) { + this.maxActive = maxActive; + } } diff --git a/modules/dcache/src/main/java/org/dcache/util/AdjustableSemaphore.java b/modules/dcache/src/main/java/org/dcache/util/AdjustableSemaphore.java index 955c18ebd9d..485e9d11936 100644 --- a/modules/dcache/src/main/java/org/dcache/util/AdjustableSemaphore.java +++ b/modules/dcache/src/main/java/org/dcache/util/AdjustableSemaphore.java @@ -32,6 +32,10 @@ public AdjustableSemaphore() { // no op } + public AdjustableSemaphore(int maxPermits) { + setMaxPermits(maxPermits); + } + /* * Must be synchronized because the underlying int is not thread safe */