From 4b7dbd4f28f6acb09e066c565ab1310610e2f846 Mon Sep 17 00:00:00 2001 From: Tigran Mkrtchyan Date: Thu, 22 Sep 2022 11:57:50 +0200 Subject: [PATCH] pool: add network traffic flow markers Motivation: To analyzing the network usage by LHC experiments WLCG plans to use packet and flow markers. Thus it required that dCache for begin, end and optionally during transfer send markers. See: https://docs.google.com/document/d/1x9JsZ7iTj44Ta06IHdkwpv5Q2u4U2QGLWnUeN2Zf5ts https://docs.google.com/document/d/1HTaNwv7huRqdNUvgHJTjlow8MivJgoknRUKgADNlvgY https://docs.google.com/document/d/1aAnsujpZnxn3oIUL9JZxcw0ZpoJNVXkHp-Yo5oj-B8U Modification: Introduce transfer lifecycle that will send FlowMarkers at the transfer start and end. Result: PoC flow markers Acked-by: Albert Rossi Acked-by: Lea Morschel Target: master Require-book: yes Require-notes: yes --- .../dcache/chimera/nfsv41/mover/NfsMover.java | 2 +- modules/dcache/pom.xml | 6 + .../main/java/org/dcache/net/FlowMarker.java | 129 +++++++++ .../classic/DefaultPostTransferService.java | 14 + .../pool/movers/NettyTransferService.java | 13 + .../dcache/pool/movers/TransferLifeCycle.java | 259 ++++++++++++++++++ .../org/dcache/pool/classic/pool.xml | 10 + .../java/org/dcache/net/FlowMarkerTest.java | 125 +++++++++ .../org/dcache/net/firefly.schema.json | 148 ++++++++++ pom.xml | 5 + skel/share/defaults/pool.properties | 30 ++ 11 files changed, 740 insertions(+), 1 deletion(-) create mode 100644 modules/dcache/src/main/java/org/dcache/net/FlowMarker.java create mode 100644 modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java create mode 100644 modules/dcache/src/test/java/org/dcache/net/FlowMarkerTest.java create mode 100644 modules/dcache/src/test/resources/org/dcache/net/firefly.schema.json diff --git a/modules/dcache-nfs/src/main/java/org/dcache/chimera/nfsv41/mover/NfsMover.java b/modules/dcache-nfs/src/main/java/org/dcache/chimera/nfsv41/mover/NfsMover.java index ce6f7e5133e..1b0d7e608ba 100644 --- a/modules/dcache-nfs/src/main/java/org/dcache/chimera/nfsv41/mover/NfsMover.java +++ b/modules/dcache-nfs/src/main/java/org/dcache/chimera/nfsv41/mover/NfsMover.java @@ -131,9 +131,9 @@ void disable(Throwable error) { * Attach mover tho the client's NFSv41 session. * * @param session to attach to + * @return true if mover is new attached to the session. */ synchronized boolean attachSession(NFSv41Session session) { - if (_session == null) { _session = session; _session.getClient().attachState(_state); diff --git a/modules/dcache/pom.xml b/modules/dcache/pom.xml index 88e4b53bcb1..5957306e367 100644 --- a/modules/dcache/pom.xml +++ b/modules/dcache/pom.xml @@ -384,6 +384,12 @@ com.fasterxml.jackson.core jackson-core + + + com.github.erosb + everit-json-schema + test + diff --git a/modules/dcache/src/main/java/org/dcache/net/FlowMarker.java b/modules/dcache/src/main/java/org/dcache/net/FlowMarker.java new file mode 100644 index 00000000000..75003728025 --- /dev/null +++ b/modules/dcache/src/main/java/org/dcache/net/FlowMarker.java @@ -0,0 +1,129 @@ +/* dCache - http://www.dcache.org/ + * + * Copyright (C) 2022 - 2024 Deutsches Elektronen-Synchrotron + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.dcache.net; + +import com.google.common.net.InetAddresses; +import java.net.InetSocketAddress; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * See: https://docs.google.com/document/d/1x9JsZ7iTj44Ta06IHdkwpv5Q2u4U2QGLWnUeN2Zf5ts/edit# + */ +public class FlowMarker { + + private final static Logger LOGGER = LoggerFactory.getLogger(FlowMarker.class); + + public static class FlowMarkerBuilder { + + private final static int VERSION = 1; + + private final JSONObject payload = new JSONObject(); + private final JSONObject lifecycle = new JSONObject(); + private final JSONObject context = new JSONObject(); + private final JSONObject flow = new JSONObject(); + + public FlowMarkerBuilder withStartedAt(Instant startTime) { + lifecycle.put("start-time", DateTimeFormatter.ISO_INSTANT.format(startTime)); + return this; + } + + public FlowMarkerBuilder withFinishedAt(Instant startTime) { + lifecycle.put("end-time", DateTimeFormatter.ISO_INSTANT.format(startTime)); + return this; + } + + public FlowMarkerBuilder withExperimentId(int id) { + context.put("experiment-id", id); + return this; + } + + public FlowMarkerBuilder withActivityId(int id) { + context.put("activity-id", id); + return this; + } + + public FlowMarkerBuilder wittApplication(String app) { + context.put("application", app); + return this; + } + + public FlowMarkerBuilder withAFI(String afi) { + switch (afi) { + case "ipv6": + case "ipv4": + break; + default: + throw new IllegalArgumentException("AFI can be 'ipv4' or 'ipv6'"); + } + + flow.put("afi", afi); + return this; + } + + public FlowMarkerBuilder withSource(InetSocketAddress addr) { + flow.put("src-ip", InetAddresses.toAddrString(addr.getAddress())); + flow.put("src-port", addr.getPort()); + return this; + } + + public FlowMarkerBuilder withDestination(InetSocketAddress addr) { + flow.put("dst-ip", InetAddresses.toAddrString(addr.getAddress())); + flow.put("dst-port", addr.getPort()); + return this; + } + + public FlowMarkerBuilder withProtocol(String proto) { + switch (proto) { + case "tcp": + case "udp": + break; + default: + throw new IllegalArgumentException("Protocol can be 'tcp' or 'udp'"); + } + + flow.put("protocol", proto); + return this; + } + + public JSONObject build(String state) { + + switch (state) { + case "start": + case "end": + case "ongoing": + break; + default: + throw new IllegalArgumentException("State can be 'start', 'ongoing' or 'end'"); + } + + payload.put("version", VERSION); + payload.put("flow-lifecycle", lifecycle); + payload.put("context", context); + payload.put("flow-id", flow); + + lifecycle.put("state", state); + lifecycle.put("current-time", DateTimeFormatter.ISO_INSTANT.format(Instant.now())); + + return payload; + } + } +} diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java b/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java index c2283b485d2..1b492e5a8ae 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import diskCacheV111.util.CacheException; import diskCacheV111.vehicles.DoorTransferFinishedMessage; +import diskCacheV111.vehicles.IpProtocolInfo; import diskCacheV111.vehicles.MoverInfoMessage; import dmg.cells.nucleus.AbstractCellComponent; import dmg.cells.nucleus.CellInfoProvider; @@ -37,6 +38,7 @@ import java.util.function.Consumer; import org.dcache.cells.CellStub; import org.dcache.pool.movers.Mover; +import org.dcache.pool.movers.TransferLifeCycle; import org.dcache.pool.repository.ModifiableReplicaDescriptor; import org.dcache.pool.repository.ReplicaDescriptor; import org.dcache.pool.statistics.DirectedIoStatistics; @@ -71,6 +73,8 @@ public class DefaultPostTransferService extends AbstractCellComponent implements private Consumer _kafkaSender = (s) -> { }; + private TransferLifeCycle transferLifeCycle; + @Required public void setBillingStub(CellStub billing) { _billing = billing; @@ -92,6 +96,10 @@ public void setKafkaTemplate(KafkaTemplate kafkaTemplate) { _kafkaSender = kafkaTemplate::sendDefault; } + public void setTransferLifeCycle(TransferLifeCycle transferLifeCycle) { + this.transferLifeCycle = transferLifeCycle; + } + public void init() { _door = new CellStub(getCellEndpoint()); } @@ -229,6 +237,12 @@ public void sendFinished(Mover mover, MoverInfoMessage moverInfoMessage) { finished.setReply(mover.getErrorCode(), mover.getErrorMessage()); } + mover.getLocalEndpoint().ifPresent(e -> + transferLifeCycle.onEnd(((IpProtocolInfo) mover.getProtocolInfo()).getSocketAddress(), + e, + mover.getProtocolInfo(), + mover.getSubject())); + _door.notify(mover.getPathToDoor(), finished); } diff --git a/modules/dcache/src/main/java/org/dcache/pool/movers/NettyTransferService.java b/modules/dcache/src/main/java/org/dcache/pool/movers/NettyTransferService.java index 4f96ca81fce..200d3eda5d3 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/movers/NettyTransferService.java +++ b/modules/dcache/src/main/java/org/dcache/pool/movers/NettyTransferService.java @@ -172,6 +172,8 @@ public abstract class NettyTransferService

private final List> shutdownFutures = new ArrayList<>(); + private TransferLifeCycle transferLifeCycle; + public NettyTransferService(String name) { this.name = name; } @@ -220,6 +222,14 @@ public void setPortRange(NettyPortRange portRange) { this.portRange = portRange; } + public void setTransferLifeCycle(TransferLifeCycle transferLifeCycle) { + this.transferLifeCycle = transferLifeCycle; + } + + public TransferLifeCycle getTransferLifeCycle() { + return transferLifeCycle; + } + public NettyPortRange getPortRange() { return portRange; } @@ -372,6 +382,9 @@ public void execute() InetSocketAddress localEndpoint = new InetSocketAddress(localIP, getServerAddress().getPort()); mover.setLocalEndpoint(localEndpoint); + transferLifeCycle.onStart(((IpProtocolInfo)mover.getProtocolInfo()).getSocketAddress(), + localEndpoint, mover.getProtocolInfo(), mover.getSubject()); + sendAddressToDoor(mover, localEndpoint); } diff --git a/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java b/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java new file mode 100644 index 00000000000..64fc9e6bb9d --- /dev/null +++ b/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java @@ -0,0 +1,259 @@ +/* dCache - http://www.dcache.org/ + * + * Copyright (C) 2022 - 2024 Deutsches Elektronen-Synchrotron + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.dcache.pool.movers; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.net.InetAddresses.forString; + +import com.google.common.base.Strings; +import com.google.common.net.HostAndPort; +import diskCacheV111.vehicles.ProtocolInfo; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.function.Function; +import java.util.function.Predicate; +import javax.annotation.Nonnull; +import javax.security.auth.Subject; +import org.dcache.auth.Subjects; +import org.dcache.net.FlowMarker.FlowMarkerBuilder; +import org.dcache.util.IPMatcher; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TransferLifeCycle { + + private final static Logger LOGGER = LoggerFactory.getLogger(TransferLifeCycle.class); + + /** + * The UDP firefly default port as described in + * document. + */ + private final static int UDP_PORT = 10514; + + // by default, send the firefly to the destination host. + private Function toFireflyDestination = a -> new InetSocketAddress( + a.getAddress(), UDP_PORT); + + // tests whatever the provided IP belongs to the sites internal network + private Predicate localSubnet = a -> false; + + private boolean enabled; + + /** + * Mark transfer start. + * @param src remote client endpoint + * @param dst local pool endpoint + * @param protocolInfo access protocol information + * @param subject associated with the transfer + */ + public void onStart(InetSocketAddress src, InetSocketAddress dst, ProtocolInfo protocolInfo, + Subject subject) { + + if (!enabled) { + return; + } + + if (isLocalTransfer(src)) { + return; + } + + if (!needMarker(protocolInfo)) { + return; + } + + var data = new FlowMarkerBuilder() + .withStartedAt(Instant.now()) + .withExperimentId(getExperimentId(subject)) + .withActivityId(getActivity(protocolInfo)) + .wittApplication(getApplication(protocolInfo)) + .withProtocol("tcp") + .withAFI(toAFI(dst)) + .withDestination(dst) + .withSource(src) + .build("start"); + + send(toFireflyDestination.apply(src), data); + } + + /** + * Mark transfer end. + * @param src remote client endpoint + * @param dst local pool endpoint + * @param protocolInfo access protocol information + * @param subject associated with the transfer + */ + public void onEnd(InetSocketAddress src, InetSocketAddress dst, ProtocolInfo protocolInfo, + Subject subject) { + + if (!enabled) { + return; + } + + if (isLocalTransfer(src)) { + return; + } + + if (!needMarker(protocolInfo)) { + return; + } + + var data = new FlowMarkerBuilder() + .withStartedAt(Instant.now()) + .withFinishedAt(Instant.now()) + .withExperimentId(getExperimentId(subject)) + .withActivityId(getActivity(protocolInfo)) + .wittApplication(getApplication(protocolInfo)) + .withProtocol("tcp") + .withAFI(toAFI(dst)) + .withDestination(dst) + .withSource(src) + .build("end"); + + send(toFireflyDestination.apply(src), data); + } + + public void setFireflyDestination(String addr) { + + if (!Strings.isNullOrEmpty(addr)) { + var destination = HostAndPort.fromString(addr); + var destinationAddr = new InetSocketAddress(destination.getHost(), + destination.getPortOrDefault(UDP_PORT)); + toFireflyDestination = a -> destinationAddr; + } + } + + public void setExcludes(String[] localSubnets) { + for (var s : localSubnets) { + localSubnet = localSubnet.or(new InetAdressPredicate(s)); + } + } + + public void setEnabled(boolean isEnabled) { + enabled = isEnabled; + } + + /** + * Send flow marker. + * + * @param dst Inet address where to flow markers should be sent. + * @param payload the marker + * @throws IllegalStateException if flow marker ist not build. + */ + private void send(InetSocketAddress dst, @Nonnull JSONObject payload) + throws IllegalStateException { + + byte[] data = payload.toString().getBytes(StandardCharsets.UTF_8); + DatagramPacket p = new DatagramPacket(data, data.length); + try (DatagramSocket socket = new DatagramSocket()) { + socket.connect(dst); + socket.send(p); + } catch (IOException e) { + LOGGER.warn("Failed to send flow marker to {}: {}", dst, e.getMessage()); + } + } + + private boolean needMarker(ProtocolInfo protocolInfo) { + + switch (protocolInfo.getProtocol().toLowerCase()) { + case "xrootd": + case "http": + return true; + default: + return false; + } + } + + private String getApplication(ProtocolInfo protocolInfo) { + // REVISIT: the application should come from protocol info + return protocolInfo.getProtocol().toLowerCase(); + } + + private int getExperimentId(Subject subject) { + + var vo = Subjects.getPrimaryFqan(subject); + if (vo == null) { + return 0; + } + + switch (vo.getGroup().toLowerCase()) { + case "atlas": + return 16; + case "cms": + return 23; + default: + return 0; + } + } + + private boolean isLocalTransfer(InetSocketAddress dst) { + InetAddress addr = dst.getAddress(); + return localSubnet.test(addr); + } + + private int getActivity(ProtocolInfo protocolInfo) { + // REVISIT: the activity should come from protocol info + return 14; // production + } + + private String toAFI(InetSocketAddress dst) { + + var addr = dst.getAddress(); + if (addr instanceof Inet6Address) { + return "ipv6"; + } else if (addr instanceof Inet4Address) { + return "ipv4"; + } else { + throw new IllegalArgumentException("Illegal address type: " + addr.getClass()); + } + } + + private class InetAdressPredicate implements Predicate { + + private final InetAddress net; + private final int mask; + + public InetAdressPredicate(String s) { + + String hostAndMask[] = s.split("/"); + checkArgument(hostAndMask.length < 3, "Invalid host specification: " + s); + + net = forString(hostAndMask[0]); + int fullMask = net.getAddress().length * Byte.SIZE; + if (hostAndMask.length == 1) { + mask = fullMask; + } else { + mask = Integer.parseInt(hostAndMask[1]); + } + checkArgument(mask <= fullMask, + "Netmask can't be bigger than full mask " + fullMask); + } + + @Override + public boolean test(InetAddress inetAddress) { + return IPMatcher.match(inetAddress, net, mask); + } + } +} diff --git a/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml b/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml index 0d340d0a910..379490850b3 100644 --- a/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml +++ b/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml @@ -345,6 +345,13 @@ + + Transfer lifecycle + + + + + Generic transfer service @@ -394,6 +401,7 @@ + @@ -463,6 +471,7 @@ + @@ -517,6 +526,7 @@ + diff --git a/modules/dcache/src/test/java/org/dcache/net/FlowMarkerTest.java b/modules/dcache/src/test/java/org/dcache/net/FlowMarkerTest.java new file mode 100644 index 00000000000..1d4ba81d8dd --- /dev/null +++ b/modules/dcache/src/test/java/org/dcache/net/FlowMarkerTest.java @@ -0,0 +1,125 @@ +package org.dcache.net; + +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.time.Instant; +import java.util.stream.Collectors; +import org.dcache.net.FlowMarker.FlowMarkerBuilder; +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class FlowMarkerTest { + + private static Schema SCHEMA; + + @BeforeClass + public static void init() throws IOException { + + try (InputStream is = Thread.currentThread().getContextClassLoader() + .getResourceAsStream("org/dcache/net/firefly.schema.json")) { + + JSONObject rawSchema = new JSONObject(new JSONTokener(is)); + SCHEMA = SchemaLoader.builder() + .schemaJson(rawSchema) + .draftV7Support() + .build().load().build(); // Wow! + } + } + + @Test(expected = IllegalArgumentException.class) + public void shouldFailWithIncorrectState() { + new FlowMarkerBuilder() + .build("foo"); + } + + + @Test(expected = IllegalArgumentException.class) + public void shouldFailWithIncorrectProtocol() { + new FlowMarkerBuilder() + .withProtocol("foo"); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldFailWithIncorrectAFI() { + new FlowMarkerBuilder() + .withAFI("foo"); + } + + @Test + public void shouldAcceptFullyBuildStartMarker() { + var data = new FlowMarkerBuilder() + .withStartedAt(Instant.now()) + .withExperimentId(2) + .withActivityId(1) + .wittApplication("curl") + .withProtocol("tcp") + .withAFI("ipv4") + .withDestination(new InetSocketAddress(InetAddress.getLoopbackAddress(), 1234)) + .withSource(new InetSocketAddress(InetAddress.getLoopbackAddress(), 5678)) + .build("start"); + + assertJson(data); + } + + @Test + public void shouldAcceptFullyBuildEndMarker() { + var data = new FlowMarkerBuilder() + .withStartedAt(Instant.now()) + .withFinishedAt(Instant.now()) + .withExperimentId(2) + .withActivityId(1) + .wittApplication("curl") + .withProtocol("tcp") + .withAFI("ipv4") + .withDestination(new InetSocketAddress(InetAddress.getLoopbackAddress(), 1234)) + .withSource(new InetSocketAddress(InetAddress.getLoopbackAddress(), 5678)) + .build("end"); + + assertJson(data); + } + + @Test + public void shouldAcceptFullyBuildOngoingMarker() { + var data = new FlowMarkerBuilder() + .withStartedAt(Instant.now()) + .withExperimentId(2) + .withActivityId(1) + .wittApplication("curl") + .withProtocol("tcp") + .withAFI("ipv4") + .withDestination(new InetSocketAddress(InetAddress.getLoopbackAddress(), 1234)) + .withSource(new InetSocketAddress(InetAddress.getLoopbackAddress(), 5678)) + .build("ongoing"); + + assertJson(data); + } + + @Test(expected = AssertionError.class) + public void shouldFailWithEmptyMarker() { + var data = new FlowMarkerBuilder() + .build("start"); + + assertJson(data); + } + + + private void assertJson(JSONObject json) { + try { + SCHEMA.validate(json); + } catch (ValidationException e) { + var msg = e.getCausingExceptions().stream() + .map(ValidationException::getMessage) + .collect(Collectors.joining(",")); + + Assert.fail(e.getMessage() + " : " + msg); + } + } +} \ No newline at end of file diff --git a/modules/dcache/src/test/resources/org/dcache/net/firefly.schema.json b/modules/dcache/src/test/resources/org/dcache/net/firefly.schema.json new file mode 100644 index 00000000000..4fac0265212 --- /dev/null +++ b/modules/dcache/src/test/resources/org/dcache/net/firefly.schema.json @@ -0,0 +1,148 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://scitags.org/schemas/v1.0.0/firefly.schema.json", + "title": "firefly message version 1", + "description": "A message containing metadata about a data transfer flow", + "type": "object", + + "properties": { + "version": { + "description": "The version number of the message format", + "type": "integer", + "minimum": 1, + "maximum": 1, + }, + "flow-lifecycle": { + "description": "Details about the start, end and current state of the flow", + "type": "object", + "properties": { + "state": { + "description": "The state of the flow at the moment the message was sent", + "type": "string", + "enum": [ "start", "end", "ongoing" ], + }, + "start-time": { + "description": "The UTC date/time that the flow started", + "type": "string", + "format": "date-time", + }, + "end-time": { + "description": "The UTC date/time that the flow ended", + "type": "string", + "format": "date-time", + }, + "current-time": { + "description": "The current UTC date/time that this message was launched", + "type": "string", + "format": "date-time", + }, + }, + "if": { "properties": { "state": { "const": "end" } } }, + "then": { + "required": [ "end-time" ] + }, + "required": [ "state", "start-time" ], + "additionalProperties": false, + }, + "flow-id": { + "description": "The IP 5-tuple of the original flow that this message is reporting about", + "type": "object", + "properties": { + "afi": { + "description": "The address family IPv4 or IPv6 of the original flow", + "type": "string", + "enum": [ "ipv4", "ipv6" ], + }, + "src-ip": { + "description": "The IPv4 or IPv6 source address of the original flow", + "type": "string", + }, + "dst-ip": { + "description": "The IPv4 or IPv6 destination address of the original flow", + "type": "string", + }, + "protocol": { + "description": "The protocol of the original flow being reported on", + "type": "string", + "enum": [ "tcp", "udp" ], + }, + "src-port": { + "description": "The layer 4 source port number of the original flow", + "type": "integer", + "minimum": 1, + "maximum": 65535, + }, + "dst-port": { + "description": "The layer 4 destination port number of the original flow", + "type": "integer", + "minimum": 1, + "maximum": 65535, + }, + }, + "oneOf": [ + { + "if": { "properties": { "afi": { "const": "ipv4" } } }, + "then": { + "properties": { + "src-ip": { "format": "ipv4" }, + "dst-ip": { "format": "ipv4" }, + }, + }, + "else": false, + }, + { + "if": { "properties": { "afi": { "const": "ipv6" } } }, + "then": { + "properties": { + "src-ip": { "format": "ipv6" }, + "dst-ip": { "format": "ipv6" }, + } + }, + "else": false, + }, + ], + "required": ["afi", "src-ip", "dst-ip", "protocol", "src-port", "dst-port"], + "additionalProperties": false, + }, + "usage": { + "description": "Bytes sent/received in the original flow (optional)", + "type": "object", + "properties": { + "received": { + "description": "Bytes received in the original flow", + "type": "integer", + }, + "sent": { + "description": "Bytes sent in the original flow", + "type": "integer", + }, + }, + "additionalProperties": false, + }, + "netlink": { + "description": "Netlink information related to the original flow (optional)", + "type": "object", + }, + "context": { + "description": "Additional contextual information about the original flow", + "type": "object", + "properties": { + "experiment-id": { + "description": "The experiment ID that the original flow is related to", + "type": "integer", + }, + "activity-id": { + "description": "The activity ID that the original flow is related to", + "type": "integer", + }, + "application": { + "description": "Name and version number of the application which is initiating the original flow", + "type": "string", + } + }, + "required": [ "experiment-id", "activity-id" ], + "additionalProperties": false, + }, + }, + "required": [ "version", "flow-lifecycle", "flow-id", "context" ], +} diff --git a/pom.xml b/pom.xml index df842c5f728..a210bf4d6ea 100644 --- a/pom.xml +++ b/pom.xml @@ -974,6 +974,11 @@ jmh-generator-annprocess ${jmh.version} + + com.github.erosb + everit-json-schema + 1.14.1 + diff --git a/skel/share/defaults/pool.properties b/skel/share/defaults/pool.properties index 63bd6c29b9a..0f8d4d5bdc0 100644 --- a/skel/share/defaults/pool.properties +++ b/skel/share/defaults/pool.properties @@ -911,3 +911,33 @@ pool.mover.https.hostcert.key=${dcache.authn.hostcert.key} # connections react to this property. # pool.mover.https.capath=${dcache.authn.capath} + + +# ---- Flow and Packet Marking +# +# To analyzing the network usage by LHC experiments WLCG uses packet and flow markers. +# +# dCache implements no called firefly that send special formatted UDM packets to the transfer +# destination. +# +# See: +# https://docs.google.com/document/d/1x9JsZ7iTj44Ta06IHdkwpv5Q2u4U2QGLWnUeN2Zf5ts +# https://docs.google.com/document/d/1HTaNwv7huRqdNUvgHJTjlow8MivJgoknRUKgADNlvgY +# https://docs.google.com/document/d/1aAnsujpZnxn3oIUL9JZxcw0ZpoJNVXkHp-Yo5oj-B8U +# +# Enable firefly markers +(one-of?true|false)pool.enable.firefly=false + +# +# The destination of transfer firefly. Change only for debug purposes. +# +# example: scitag.es.net:10514 +# +pool.firefly.destination= + +# +# A comma separated list of network destinations that should be ignored by transfer flow monitoring +# +# example: a.b.c.d/16, aa:bb:cc:dd:ee:ff/64, v.x.w.z +# +pool.firefly.excludes= \ No newline at end of file