From 12ce964c1df02b067887b40fa0281d920db3e843 Mon Sep 17 00:00:00 2001 From: cirnoooo123 <1601169949@qq.com> Date: Thu, 11 May 2023 11:14:22 +0800 Subject: [PATCH 1/2] implement secret sharing multiply --- .../openhufu/core/config/OpenHuFuConfig.java | 2 +- .../core/implementor/UserSideImplementor.java | 6 +- .../com/hufudb/openhufu/mpc/ProtocolType.java | 4 +- .../openhufu/mpc/multiply/MulCache.java | 54 +++++++ .../openhufu/mpc/multiply/SecretMultiply.java | 142 ++++++++++++++++++ .../secretMultiply/SecretMultiplyTest.java | 127 ++++++++++++++++ 6 files changed, 331 insertions(+), 4 deletions(-) create mode 100644 mpc/src/main/java/com/hufudb/openhufu/mpc/multiply/MulCache.java create mode 100644 mpc/src/main/java/com/hufudb/openhufu/mpc/multiply/SecretMultiply.java create mode 100644 mpc/src/test/java/com/hufudb/openhufu/mpc/secretMultiply/SecretMultiplyTest.java diff --git a/core/src/main/java/com/hufudb/openhufu/core/config/OpenHuFuConfig.java b/core/src/main/java/com/hufudb/openhufu/core/config/OpenHuFuConfig.java index d4b550c2..232cdd4f 100644 --- a/core/src/main/java/com/hufudb/openhufu/core/config/OpenHuFuConfig.java +++ b/core/src/main/java/com/hufudb/openhufu/core/config/OpenHuFuConfig.java @@ -1,7 +1,7 @@ package com.hufudb.openhufu.core.config; public class OpenHuFuConfig { - public static final int CLIENT_THREAD_NUM = 4; // client side thread pool size + public static final int CLIENT_THREAD_NUM = 20; // client side thread pool size public static final int SERVER_THREAD_NUM = 4; // server side thread pool size public static final long RPC_TIME_OUT = 60000; // time out when waiting for response in ms public static final int ZK_TIME_OUT = 6000; // time out of zk diff --git a/core/src/main/java/com/hufudb/openhufu/core/implementor/UserSideImplementor.java b/core/src/main/java/com/hufudb/openhufu/core/implementor/UserSideImplementor.java index ae001091..0ba1f314 100644 --- a/core/src/main/java/com/hufudb/openhufu/core/implementor/UserSideImplementor.java +++ b/core/src/main/java/com/hufudb/openhufu/core/implementor/UserSideImplementor.java @@ -112,7 +112,8 @@ private boolean isPrivacyRangeJoin(BinaryPlan plan) { if (plan.getJoinCond().getModifier().equals(Modifier.PUBLIC)) { return false; } - if (!plan.getJoinCond().getCondition().getIn(0).getModifier().equals(Modifier.PUBLIC)) { + if (!plan.getJoinCond().getCondition().getInList().isEmpty() + && !plan.getJoinCond().getCondition().getIn(0).getModifier().equals(Modifier.PUBLIC)) { throw new OpenHuFuException(ErrorCode.RANGE_JOIN_LEFT_TABLE_NOT_PUBLIC); } return plan.getJoinCond().getCondition().getStr().equals("dwithin"); @@ -122,7 +123,8 @@ private boolean isPrivacyKNNJoin(BinaryPlan plan) { if (plan.getJoinCond().getModifier().equals(Modifier.PUBLIC)) { return false; } - if (!plan.getJoinCond().getCondition().getIn(0).getModifier().equals(Modifier.PUBLIC)) { + if (!plan.getJoinCond().getCondition().getInList().isEmpty() + && !plan.getJoinCond().getCondition().getIn(0).getModifier().equals(Modifier.PUBLIC)) { throw new OpenHuFuException(ErrorCode.RANGE_JOIN_LEFT_TABLE_NOT_PUBLIC); } return plan.getJoinCond().getCondition().getStr().equals("knn"); diff --git a/mpc/src/main/java/com/hufudb/openhufu/mpc/ProtocolType.java b/mpc/src/main/java/com/hufudb/openhufu/mpc/ProtocolType.java index 263c6c8a..71128b26 100644 --- a/mpc/src/main/java/com/hufudb/openhufu/mpc/ProtocolType.java +++ b/mpc/src/main/java/com/hufudb/openhufu/mpc/ProtocolType.java @@ -17,7 +17,9 @@ public enum ProtocolType { SS("SS", 101, true), HASH_PSI("PSI", 200, true), ABY("ABY", 300, true), - SECRET_UNION("SECRET_UNION", 400, true); + SECRET_UNION("SECRET_UNION", 400, true), + SECRET_MULTIPLY("SECRET_MULTIPLY", 401, true); + private static final ImmutableMap MAP; static { diff --git a/mpc/src/main/java/com/hufudb/openhufu/mpc/multiply/MulCache.java b/mpc/src/main/java/com/hufudb/openhufu/mpc/multiply/MulCache.java new file mode 100644 index 00000000..55b752df --- /dev/null +++ b/mpc/src/main/java/com/hufudb/openhufu/mpc/multiply/MulCache.java @@ -0,0 +1,54 @@ +package com.hufudb.openhufu.mpc.multiply; + +import java.util.Random; + +public class MulCache { + private final int[][] ran; + private long[][] val; + private boolean isInit; + private final static int TIME_OUT = 60000; + + public MulCache(int n) { + ran = new int[n][2]; + val = new long[n][2]; + isInit = false; + Random random = new Random(System.currentTimeMillis()); + for (int i = 0; i < n; ++i) { + ran[i][0] = random.nextInt(128) + 1; + ran[i][1] = random.nextInt(128) + 1; + } + } + + public int getRan(int idx, boolean isFirst) { + return isFirst ? ran[idx][0] : ran[idx][1]; + } + + public synchronized void setVal(long[][] val) { + this.val = val; + isInit = true; + this.notifyAll(); + } + + public int ranSum(int idx) { + int sum = 0; + for (int i = 0; i < ran.length; ++i) { + if (i != idx) { + sum += ran[i][0] * ran[i][1]; + } + } + return sum; + } + + public synchronized long getVal(int idx, boolean isFirst) { + int i = 0; + while (!isInit && i < 20) { + try { + this.wait(TIME_OUT); + } catch (InterruptedException e) { + e.printStackTrace(); + } + ++i; + } + return isFirst ? val[idx][0] : val[idx][1]; + } +} diff --git a/mpc/src/main/java/com/hufudb/openhufu/mpc/multiply/SecretMultiply.java b/mpc/src/main/java/com/hufudb/openhufu/mpc/multiply/SecretMultiply.java new file mode 100644 index 00000000..660944af --- /dev/null +++ b/mpc/src/main/java/com/hufudb/openhufu/mpc/multiply/SecretMultiply.java @@ -0,0 +1,142 @@ +package com.hufudb.openhufu.mpc.multiply; + +import com.google.common.collect.ImmutableList; +import com.hufudb.openhufu.mpc.ProtocolException; +import com.hufudb.openhufu.mpc.ProtocolType; +import com.hufudb.openhufu.mpc.RpcProtocolExecutor; +import com.hufudb.openhufu.mpc.codec.OpenHuFuCodec; +import com.hufudb.openhufu.rpc.Rpc; +import com.hufudb.openhufu.rpc.utils.DataPacket; +import com.hufudb.openhufu.rpc.utils.DataPacketHeader; + +import java.util.List; + +//Dan Bogdanov, Sven Laur, and Jan Willemson. 2008. Sharemind: A Framework for Fast Privacy-Preserving Computations. In ESORICS. 192–206. + +public class SecretMultiply extends RpcProtocolExecutor { + private MulCache mc; + private List parties; + private long taskId; + private long localSum; + private boolean isLeader; + private int idx; + private long[][] vals; + public SecretMultiply(Rpc rpc) { + super(rpc, ProtocolType.SS); + } + + private void fillVals(long u, long v) { +// LOG.info("fillVals start in {}", ownId); + for (int i = 0; i < parties.size(); i++) { + if (i == idx) { + continue; + } + int t = i == (idx - 1 + parties.size()) % parties.size()? + (idx - 2 + parties.size()) % parties.size(): (idx - 1 + parties.size()) % parties.size(); + send(mc.getRan(i, true), 0 + generateStepID(t, i), parties.get(t)); + send(mc.getRan(i, false), 1 + generateStepID(i, t), parties.get(i)); + } + for (int i = 0; i < parties.size(); i++) { + if (i == idx) { + continue; + } + vals[i][1] = u + receive(0 + generateStepID(idx, i), parties.get(getThirdEndpoint(idx, i, parties.size()))); + vals[i][0] = v + receive(1 + generateStepID(idx, i), parties.get(getThirdEndpoint(i, idx, parties.size()))); + } + mc.setVal(vals); + } + private int getThirdEndpoint(int i, int j, int n) { + return (((i + 1) % n) == j) ? (i + 2) % n : (i + 1) % n; + } + + private int generateStepID(int i, int j) { + return (i * parties.size() + j) * 2; + } + + private void calShares(long u, long v) { + LOG.info("calShares start in {}", ownId); + for (int i = 0; i < parties.size(); i++) { + if (i == idx) { + continue; + } + send(mc.getVal(i, true), mc.getVal(i, false), 2, parties.get(i)); + } + for (int i = 0; i < parties.size(); i++) { + if (i == idx) { + continue; + } + List res = receive2(2, parties.get(i)); + localSum += -mc.getVal(i, false) * res.get(0) + u * res.get(0) + v * res.get(1); + } + } + + private long sumShares() { + LOG.info("sumShares start in {}", ownId); + long globalSum = 0; + if (isLeader) { + for (int i = 0; i < parties.size(); i++) { + if (i == idx) { + globalSum += localSum; + } + else { + globalSum += receive(3, parties.get(i)); + } + } + } + else { + send(localSum, 3, parties.get(0)); + } + return globalSum; + } + + + + private void send(long value, int stepID, int partyID) { + LOG.info("send to {}, {}", partyID, stepID); + DataPacketHeader header = new DataPacketHeader(taskId, getProtocolTypeId(), stepID, ownId, partyID); + rpc.send(DataPacket.fromByteArrayList(header, ImmutableList.of(OpenHuFuCodec.encodeLong(value)))); + } + + private void send(long value1, long value2, int stepID, int partyID) { + DataPacketHeader header = new DataPacketHeader(taskId, getProtocolTypeId(), stepID, ownId, partyID); + rpc.send(DataPacket.fromByteArrayList(header, + ImmutableList.of(OpenHuFuCodec.encodeLong(value1), OpenHuFuCodec.encodeLong(value2)))); + } + + private long receive(int stepID, int partyID) { + LOG.info("receive {}, {}", partyID, stepID); + final DataPacketHeader expect = new DataPacketHeader(taskId, getProtocolTypeId(), stepID, partyID, ownId); + DataPacket packet = rpc.receive(expect); + return OpenHuFuCodec.decodeLong(packet.getPayload().get(0)); +// return 0; + } + + private List receive2(int stepID, int partyID) { + final DataPacketHeader expect = new DataPacketHeader(taskId, getProtocolTypeId(), stepID, partyID, ownId); + DataPacket packet = rpc.receive(expect); + return ImmutableList.of(OpenHuFuCodec.decodeLong(packet.getPayload().get(0)), + OpenHuFuCodec.decodeLong(packet.getPayload().get(1))); + } + + /** + * @param args[0] input value1 + * @param args[1] input value2 + * @return result of ColumnType for the first party, 0 for other parties + */ + @Override + public Object run(long taskId, List parties, Object... args) throws ProtocolException { + long u = (long) args[0]; + long v = (long) args[1]; + this.mc = new MulCache(parties.size()); + this.parties = parties; + this.taskId = taskId; + this.localSum = 0; + this.isLeader = ownId == parties.get(0); + this.idx = parties.indexOf(ownId); + this.vals = new long[parties.size()][2]; + localSum += u * v + mc.ranSum(idx); + fillVals(u, v); + calShares(u, v); + return sumShares(); + } +} diff --git a/mpc/src/test/java/com/hufudb/openhufu/mpc/secretMultiply/SecretMultiplyTest.java b/mpc/src/test/java/com/hufudb/openhufu/mpc/secretMultiply/SecretMultiplyTest.java new file mode 100644 index 00000000..1c0c1322 --- /dev/null +++ b/mpc/src/test/java/com/hufudb/openhufu/mpc/secretMultiply/SecretMultiplyTest.java @@ -0,0 +1,127 @@ +package com.hufudb.openhufu.mpc.secretMultiply; + +import com.google.common.collect.ImmutableList; +import com.hufudb.openhufu.mpc.multiply.SecretMultiply; +import com.hufudb.openhufu.rpc.Party; +import com.hufudb.openhufu.rpc.grpc.OpenHuFuOwnerInfo; +import com.hufudb.openhufu.rpc.grpc.OpenHuFuRpc; +import com.hufudb.openhufu.rpc.grpc.OpenHuFuRpcManager; +import io.grpc.Channel; +import io.grpc.Server; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.testing.GrpcCleanupRule; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.locationtech.jts.geom.GeometryFactory; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; + +@RunWith(JUnit4.class) +public class SecretMultiplyTest { + public final static GeometryFactory geoFactory = new GeometryFactory(); + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + OpenHuFuRpcManager manager; + ExecutorService threadpool = Executors.newFixedThreadPool(5); + + @Before + public void setUp() throws IOException { + String ownerName0 = InProcessServerBuilder.generateName(); + String ownerName1 = InProcessServerBuilder.generateName(); + String ownerName2 = InProcessServerBuilder.generateName(); + String ownerName3 = InProcessServerBuilder.generateName(); + String ownerName4 = InProcessServerBuilder.generateName(); + Party owner0 = new OpenHuFuOwnerInfo(0, ownerName0); + Party owner1 = new OpenHuFuOwnerInfo(1, ownerName1); + Party owner2 = new OpenHuFuOwnerInfo(2, ownerName2); + Party owner3 = new OpenHuFuOwnerInfo(3, ownerName3); + Party owner4 = new OpenHuFuOwnerInfo(4, ownerName4); + List parties = ImmutableList.of(owner0, owner1, owner2, owner3, owner4); + List channels = Arrays.asList( + grpcCleanup.register(InProcessChannelBuilder.forName(ownerName0).directExecutor().build()), + grpcCleanup.register(InProcessChannelBuilder.forName(ownerName1).directExecutor().build()), + grpcCleanup.register(InProcessChannelBuilder.forName(ownerName2).directExecutor().build()), + grpcCleanup.register(InProcessChannelBuilder.forName(ownerName3).directExecutor().build()), + grpcCleanup.register(InProcessChannelBuilder.forName(ownerName4).directExecutor().build())); + manager = new OpenHuFuRpcManager(parties, channels); + OpenHuFuRpc rpc0 = (OpenHuFuRpc) manager.getRpc(0); + OpenHuFuRpc rpc1 = (OpenHuFuRpc) manager.getRpc(1); + OpenHuFuRpc rpc2 = (OpenHuFuRpc) manager.getRpc(2); + OpenHuFuRpc rpc3 = (OpenHuFuRpc) manager.getRpc(3); + OpenHuFuRpc rpc4 = (OpenHuFuRpc) manager.getRpc(4); + rpc0.connect(); + rpc1.connect(); + rpc2.connect(); + rpc3.connect(); + rpc4.connect(); + Server server0 = InProcessServerBuilder.forName(ownerName0).directExecutor() + .addService(rpc0.getgRpcService()).build().start(); + Server server1 = InProcessServerBuilder.forName(ownerName1).directExecutor() + .addService(rpc1.getgRpcService()).build().start(); + Server server2 = InProcessServerBuilder.forName(ownerName2).directExecutor() + .addService(rpc2.getgRpcService()).build().start(); + Server server3 = InProcessServerBuilder.forName(ownerName3).directExecutor() + .addService(rpc3.getgRpcService()).build().start(); + Server server4 = InProcessServerBuilder.forName(ownerName4).directExecutor() + .addService(rpc4.getgRpcService()).build().start(); + grpcCleanup.register(server0); + grpcCleanup.register(server1); + grpcCleanup.register(server2); + grpcCleanup.register(server3); + grpcCleanup.register(server4); + } + + void testMultiply(long taskId, List executors, List integers, long ans) + throws InterruptedException, ExecutionException { + List parties = executors.stream().map(e -> e.getOwnId()).collect(Collectors.toList()); + List> futures = new ArrayList<>(); + for (int i = 0; i < executors.size(); ++i) { + final SecretMultiply s = executors.get(i); + final int int1 = integers.get(2 * i); + final int int2 = integers.get(2 * i + 1); + futures.add(threadpool.submit(new Callable() { + @Override + public Object call() throws Exception { + return s.run(taskId, parties, (long) int1, (long) int2); + } + })); + } + long res = (long) futures.get(0).get(); + assertEquals(ans, res); + } + + @Test + public void testSecretMultiply() throws InterruptedException, ExecutionException { + Random random = new Random(); + OpenHuFuRpc rpc0 = (OpenHuFuRpc) manager.getRpc(0); + OpenHuFuRpc rpc1 = (OpenHuFuRpc) manager.getRpc(1); + OpenHuFuRpc rpc2 = (OpenHuFuRpc) manager.getRpc(2); + OpenHuFuRpc rpc3 = (OpenHuFuRpc) manager.getRpc(3); + OpenHuFuRpc rpc4 = (OpenHuFuRpc) manager.getRpc(4); + List rpcs = ImmutableList.of(rpc0, rpc1, rpc2, rpc3, rpc4); + List executors = + rpcs.stream().map(rpc -> new SecretMultiply(rpc)).collect(Collectors.toList()); + List integers = new ArrayList<>(); + long u = 0; + long v = 0; + for (int i = 0; i < 5; i++) { + int int1 = random.nextInt(128); + int int2 = random.nextInt(128); + integers.add(int1); + integers.add(int2); + u += int1; + v += int2; + } + testMultiply(0, executors, integers, u * v); + } +} From ee8b7946b5e7514628846dd7453ec9b0425ffc47 Mon Sep 17 00:00:00 2001 From: cirnoooo123 <1601169949@qq.com> Date: Tue, 7 May 2024 21:48:59 +0800 Subject: [PATCH 2/2] use secret union --- .../openhufu/core/sql/plan/PlanUtils.java | 5 +++++ .../openhufu/mpc/union/SecretUnion.java | 2 +- .../owner/config/ImplementorConfig.java | 5 ++++- .../implementor/OwnerImplementorFactory.java | 19 ++++++++++++++++ .../implementor/OwnerSideImplementor.java | 7 +++++- .../owner/implementor/union/OwnerUnion.java | 14 ++++++++++++ .../owner/implementor/union/SafeUnion.java | 22 +++++++++++++++++++ owner/src/main/resources/owner1.yml | 3 ++- .../com/hufudb/openhufu/plan/LeafPlan.java | 15 ++++++++++--- release/config/owner.yml | 3 ++- 10 files changed, 87 insertions(+), 8 deletions(-) create mode 100644 owner/src/main/java/com/hufudb/openhufu/owner/implementor/union/OwnerUnion.java create mode 100644 owner/src/main/java/com/hufudb/openhufu/owner/implementor/union/SafeUnion.java diff --git a/core/src/main/java/com/hufudb/openhufu/core/sql/plan/PlanUtils.java b/core/src/main/java/com/hufudb/openhufu/core/sql/plan/PlanUtils.java index 310156b8..d9ba043b 100644 --- a/core/src/main/java/com/hufudb/openhufu/core/sql/plan/PlanUtils.java +++ b/core/src/main/java/com/hufudb/openhufu/core/sql/plan/PlanUtils.java @@ -47,6 +47,11 @@ public static List> generateLeafOwnerPlans(Ope } List> tableClients = client.getTableClients(plan.getTableName()); List> ownerContext = new ArrayList<>(); + TaskInfo.Builder taskInfo = TaskInfo.newBuilder().setTaskId(client.getTaskId()); + for (Pair entry : tableClients) { + taskInfo.addParties(entry.getLeft().getParty().getPartyId()); + } + builder.setTaskInfo(taskInfo); for (Pair entry : tableClients) { builder.setTableName(entry.getRight()); ownerContext.add(MutablePair.of(entry.getLeft(), builder.build())); diff --git a/mpc/src/main/java/com/hufudb/openhufu/mpc/union/SecretUnion.java b/mpc/src/main/java/com/hufudb/openhufu/mpc/union/SecretUnion.java index 77f32813..cc03c17d 100644 --- a/mpc/src/main/java/com/hufudb/openhufu/mpc/union/SecretUnion.java +++ b/mpc/src/main/java/com/hufudb/openhufu/mpc/union/SecretUnion.java @@ -182,6 +182,6 @@ public Object run(long taskId, List parties, Object... args) throws Pro } else { followerProcedure(localDataSet); } - return null; + return EmptyDataSet.INSTANCE; } } diff --git a/owner/src/main/java/com/hufudb/openhufu/owner/config/ImplementorConfig.java b/owner/src/main/java/com/hufudb/openhufu/owner/config/ImplementorConfig.java index b2f03e03..3ac6dd56 100644 --- a/owner/src/main/java/com/hufudb/openhufu/owner/config/ImplementorConfig.java +++ b/owner/src/main/java/com/hufudb/openhufu/owner/config/ImplementorConfig.java @@ -23,7 +23,8 @@ public enum Implementor { AGG_MIN(IMPLEMENTOR_KEY.AGG_PREFIX + "min"), AGG_SUM(IMPLEMENTOR_KEY.AGG_PREFIX + "sum"), AGG_AVG(IMPLEMENTOR_KEY.AGG_PREFIX + "avg"), - JOIN(IMPLEMENTOR_KEY.PREFIX + "join"); + JOIN(IMPLEMENTOR_KEY.PREFIX + "join"), + UNION(IMPLEMENTOR_KEY.PREFIX + "union"); Implementor(String value) { this.value = value; @@ -54,6 +55,8 @@ public static void initImplementorConfig(String implementorPath) { getClazz(Implementor.AGG_AVG.value)); implementor2ClassMap.put(Implementor.JOIN, getClazz(Implementor.JOIN.value)); + implementor2ClassMap.put(Implementor.UNION, + getClazz(Implementor.UNION.value)); } private static void loadImplementorConfig(String implementorPath) { Yaml yaml = new Yaml(new SafeConstructor()); diff --git a/owner/src/main/java/com/hufudb/openhufu/owner/implementor/OwnerImplementorFactory.java b/owner/src/main/java/com/hufudb/openhufu/owner/implementor/OwnerImplementorFactory.java index 4cf33927..9a8eaeb2 100644 --- a/owner/src/main/java/com/hufudb/openhufu/owner/implementor/OwnerImplementorFactory.java +++ b/owner/src/main/java/com/hufudb/openhufu/owner/implementor/OwnerImplementorFactory.java @@ -7,6 +7,7 @@ import com.hufudb.openhufu.owner.config.ImplementorConfig.Implementor; import com.hufudb.openhufu.owner.implementor.aggregate.OwnerAggregateFunction; import com.hufudb.openhufu.owner.implementor.join.OwnerJoin; +import com.hufudb.openhufu.owner.implementor.union.OwnerUnion; import com.hufudb.openhufu.proto.OpenHuFuPlan; import com.hufudb.openhufu.rpc.Rpc; @@ -21,6 +22,8 @@ public class OwnerImplementorFactory { public static Map aggFuncType2ClassName; public static String joinClassName; + public static String unionClassName; + static { aggFuncType2ClassName = new HashMap<>(); aggFuncType2ClassName.put(AggFuncType.SUM, ImplementorConfig.getImplementorClassName( @@ -34,6 +37,7 @@ public class OwnerImplementorFactory { // aggFuncType2ClassName.put(AggFuncType.AVG, ImplementorConfig.getImplementorClassName( // Implementor.AGG_AVG)); joinClassName = ImplementorConfig.getImplementorClassName(Implementor.JOIN); + unionClassName = ImplementorConfig.getImplementorClassName(Implementor.UNION); } public static OwnerAggregateFunction getAggregationFunction(AggFuncType aggFuncType, @@ -72,6 +76,21 @@ public static OwnerJoin getJoin() { } } + public static OwnerUnion getUnion() { + try { + Class clazz = Class.forName(unionClassName); + Constructor constructor = + clazz.getDeclaredConstructor(); + return (OwnerUnion) constructor.newInstance(); + } catch(ClassNotFoundException e){ + throw new OpenHuFuException(e, ErrorCode.IMPLEMENTOR_CLASS_NOT_FOUND, unionClassName); + } catch(NoSuchMethodException e){ + throw new OpenHuFuException(e, ErrorCode.IMPLEMENTOR_CONSTRUCTOR_NOT_FOUND, unionClassName); + } catch(InstantiationException | IllegalAccessException | InvocationTargetException e){ + throw new OpenHuFuException(e, ErrorCode.IMPLEMENTOR_CREATE_FAILED, unionClassName); + } + } + public static void main (String[]args){ OwnerJoin ownerJoin = getJoin(); } diff --git a/owner/src/main/java/com/hufudb/openhufu/owner/implementor/OwnerSideImplementor.java b/owner/src/main/java/com/hufudb/openhufu/owner/implementor/OwnerSideImplementor.java index 0b6eec54..3e7dc1f2 100644 --- a/owner/src/main/java/com/hufudb/openhufu/owner/implementor/OwnerSideImplementor.java +++ b/owner/src/main/java/com/hufudb/openhufu/owner/implementor/OwnerSideImplementor.java @@ -9,6 +9,7 @@ import com.hufudb.openhufu.mpc.ProtocolException; import com.hufudb.openhufu.owner.adapter.Adapter; import com.hufudb.openhufu.owner.implementor.aggregate.OwnerAggregation; +import com.hufudb.openhufu.owner.implementor.union.OwnerUnion; import com.hufudb.openhufu.plan.BinaryPlan; import com.hufudb.openhufu.plan.LeafPlan; import com.hufudb.openhufu.plan.Plan; @@ -18,6 +19,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.hufudb.openhufu.owner.implementor.OwnerImplementorFactory.getUnion; + public class OwnerSideImplementor implements PlanImplementor { private static final Logger LOG = LoggerFactory.getLogger(OwnerSideImplementor.class); @@ -87,7 +90,9 @@ public DataSet unaryQuery(UnaryPlan unary) { @Override public DataSet leafQuery(LeafPlan leaf) { try { - return dataSourceAdapter.query(leaf); + DataSet localDataSet = dataSourceAdapter.query(leaf); + OwnerUnion union = getUnion(); + return union.union(localDataSet, rpc, leaf.getTaskInfo()); } catch (Exception e) { LOG.error("Error when execute query on Database", e); return EmptyDataSet.INSTANCE; diff --git a/owner/src/main/java/com/hufudb/openhufu/owner/implementor/union/OwnerUnion.java b/owner/src/main/java/com/hufudb/openhufu/owner/implementor/union/OwnerUnion.java new file mode 100644 index 00000000..9f6ebc1b --- /dev/null +++ b/owner/src/main/java/com/hufudb/openhufu/owner/implementor/union/OwnerUnion.java @@ -0,0 +1,14 @@ +package com.hufudb.openhufu.owner.implementor.union; + +import com.hufudb.openhufu.data.storage.DataSet; +import com.hufudb.openhufu.mpc.ProtocolException; +import com.hufudb.openhufu.proto.OpenHuFuPlan.JoinCondition; +import com.hufudb.openhufu.proto.OpenHuFuPlan.TaskInfo; +import com.hufudb.openhufu.rpc.Rpc; + + +public interface OwnerUnion { + + DataSet union(DataSet in, Rpc rpc, TaskInfo taskInfo) + throws ProtocolException; +} diff --git a/owner/src/main/java/com/hufudb/openhufu/owner/implementor/union/SafeUnion.java b/owner/src/main/java/com/hufudb/openhufu/owner/implementor/union/SafeUnion.java new file mode 100644 index 00000000..5e73adbb --- /dev/null +++ b/owner/src/main/java/com/hufudb/openhufu/owner/implementor/union/SafeUnion.java @@ -0,0 +1,22 @@ +package com.hufudb.openhufu.owner.implementor.union; + +import com.hufudb.openhufu.data.storage.DataSet; +import com.hufudb.openhufu.mpc.ProtocolException; +import com.hufudb.openhufu.mpc.union.SecretUnion; +import com.hufudb.openhufu.proto.OpenHuFuPlan; +import com.hufudb.openhufu.rpc.Rpc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class SafeUnion implements OwnerUnion { + + static final Logger LOG = LoggerFactory.getLogger(SafeUnion.class); + + @Override + public DataSet union(DataSet in, Rpc rpc, OpenHuFuPlan.TaskInfo taskInfo) throws ProtocolException { + LOG.info("using safe union."); + SecretUnion secretUnion = new SecretUnion(rpc); + return (DataSet) secretUnion.run(taskInfo.getTaskId(), taskInfo.getPartiesList(), in); + } +} diff --git a/owner/src/main/resources/owner1.yml b/owner/src/main/resources/owner1.yml index b894a345..9cd905d4 100644 --- a/owner/src/main/resources/owner1.yml +++ b/owner/src/main/resources/owner1.yml @@ -7,4 +7,5 @@ owner: min: null sum: com.hufudb.openhufu.owner.implementor.aggregate.sum.SecretSharingSum avg: null - join: com.hufudb.openhufu.owner.implementor.join.HashEqualJoin \ No newline at end of file + join: com.hufudb.openhufu.owner.implementor.join.HashEqualJoin + union: com.hufudb.openhufu.owner.implementor.union.SafeUnion \ No newline at end of file diff --git a/plan/src/main/java/com/hufudb/openhufu/plan/LeafPlan.java b/plan/src/main/java/com/hufudb/openhufu/plan/LeafPlan.java index 5fc94f0b..cd15ab01 100644 --- a/plan/src/main/java/com/hufudb/openhufu/plan/LeafPlan.java +++ b/plan/src/main/java/com/hufudb/openhufu/plan/LeafPlan.java @@ -11,6 +11,7 @@ import com.hufudb.openhufu.implementor.PlanImplementor; import com.hufudb.openhufu.proto.OpenHuFuData.ColumnType; import com.hufudb.openhufu.proto.OpenHuFuData.Modifier; +import com.hufudb.openhufu.proto.OpenHuFuPlan; import com.hufudb.openhufu.proto.OpenHuFuPlan.Collation; import com.hufudb.openhufu.proto.OpenHuFuPlan.Expression; import com.hufudb.openhufu.proto.OpenHuFuPlan.PlanType; @@ -29,7 +30,7 @@ public class LeafPlan extends BasePlan { List orders = ImmutableList.of(); int fetch; int offset; - + OpenHuFuPlan.TaskInfo taskInfo; public LeafPlan() { super(); } @@ -42,6 +43,7 @@ public QueryPlanProto toProto() { builder.addAllAggExp(aggExps); builder.addAllGroup(groups); builder.addAllOrder(orders); + builder.setTaskInfo(taskInfo); return builder.build(); } @@ -55,6 +57,7 @@ public static LeafPlan fromProto(QueryPlanProto proto) { plan.setOrders(proto.getOrderList()); plan.setFetch(proto.getFetch()); plan.setOffset(proto.getOffset()); + plan.taskInfo = proto.getTaskInfo(); return plan; } @@ -93,6 +96,11 @@ public List getOutModifiers() { return getOutExpressions().stream().map(exp -> exp.getModifier()).collect(Collectors.toList()); } + @Override + public OpenHuFuPlan.TaskInfo getTaskInfo() { + return taskInfo; + } + @Override public List getOutExpressions() { if (!aggExps.isEmpty()) { @@ -204,7 +212,8 @@ public String toString() { "\tgroups=" + groups + '$' + "\torders=" + orders + '$' + "\tfetch=" + fetch + '$' + - "\toffset=" + offset).replace('\n', '|').replace('$', '\n') + '\n' + - "}"; + "\toffset=" + offset + '$' + + "\ttaskInfo=" + taskInfo).replace('\n', '|').replace('$', '\n') + '\n' + + "}"; } } diff --git a/release/config/owner.yml b/release/config/owner.yml index 18e5f234..53fcc91c 100644 --- a/release/config/owner.yml +++ b/release/config/owner.yml @@ -6,4 +6,5 @@ owner: min: null sum: com.hufudb.openhufu.owner.implementor.aggregate.sum.SecretSharingSum avg: null - join: com.hufudb.openhufu.owner.implementor.join.HashEqualJoin \ No newline at end of file + join: com.hufudb.openhufu.owner.implementor.join.HashEqualJoin + union: com.hufudb.openhufu.owner.implementor.union.SafeUnion \ No newline at end of file