Skip to content

Commit

Permalink
CTE support for snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
codewithaloknath committed Apr 4, 2022
1 parent 7ad7194 commit 92a144d
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -748,12 +748,6 @@ else if (statement instanceof InsertCube) {
reasons.add("Only support CTAS (create table as select) and INSERT INTO (tables) statements");
}

// Doesn't work with the following features
if (SystemSessionProperties.isReuseTableScanEnabled(session)
|| SystemSessionProperties.isCTEReuseEnabled(session)) {
reasons.add("No support along with reuse_table_scan or cte_reuse_enabled features");
}

// All input tables must support snapshotting
for (TableHandle tableHandle : analysis.getTables()) {
if (!metadata.isSnapshotSupportedAsInput(session, tableHandle)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,22 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.log.Logger;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceOutput;
import io.airlift.slice.Slices;
import io.prestosql.spi.Page;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockEncodingSerde;
import io.prestosql.spi.plan.PlanNodeId;
import io.prestosql.spi.snapshot.BlockEncodingSerdeProvider;
import io.prestosql.spi.snapshot.Restorable;
import io.prestosql.spi.snapshot.RestorableConfig;
import it.unimi.dsi.fastutil.longs.LongArrayList;

import javax.annotation.concurrent.GuardedBy;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
Expand All @@ -37,11 +48,13 @@
import static com.google.common.base.Preconditions.checkArgument;
import static io.prestosql.operator.Operator.NOT_BLOCKED;

@RestorableConfig()
public class CommonTableExecutionContext
implements Restorable
{
private static final Logger LOG = Logger.get(CommonTableExecutionContext.class);
private final String name;
private final int queueCnt;
private int queueCnt;
private final PlanNodeId feederId;
private boolean isFeederInitialized;
private List<Integer> feeders = Collections.synchronizedList(new ArrayList<>());
Expand All @@ -52,9 +65,9 @@ public class CommonTableExecutionContext
private final Executor notificationExecutor;
@GuardedBy("this")
private SettableFuture<?> blockedFuture;
private final int taskCount;
private final int maxMainQueueSize;
private final int maxPrefetchQueueSize;
private int taskCount;
private int maxMainQueueSize;
private int maxPrefetchQueueSize;

public CommonTableExecutionContext(String name, Set<PlanNodeId> consumers, PlanNodeId feederId, Executor notificationExecutor,
int taskCount, int maxMainQueueSize, int maxPrefetchQueueSize)
Expand Down Expand Up @@ -264,4 +277,40 @@ public CTEDoneException()
super();
}
}

@Override
public Object capture(BlockEncodingSerdeProvider serdeProvider) {
BlockEncodingSerde blockSerde = serdeProvider.getBlockEncodingSerde();
CommonTableExecutionContextState myState = new CommonTableExecutionContextState();

myState.queueCnt = queueCnt;
myState.maxPrefetchQueueSize = maxPrefetchQueueSize;
myState.maxMainQueueSize = maxMainQueueSize;
myState.taskCount = taskCount;
return myState;
}

@Override
public void restore(Object state, BlockEncodingSerdeProvider serdeProvider)
{
BlockEncodingSerde blockSerde = serdeProvider.getBlockEncodingSerde();
CommonTableExecutionContextState myState = new CommonTableExecutionContextState();

this.queueCnt = myState.queueCnt;
this.maxPrefetchQueueSize = myState.maxPrefetchQueueSize;
this.maxMainQueueSize = myState.maxMainQueueSize;
this.taskCount = myState.taskCount;
}

private static class CommonTableExecutionContextState
implements Serializable
{

private int queueCnt;
private int maxPrefetchQueueSize;
private int maxMainQueueSize;
private int taskCount;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.prestosql.snapshot.SingleInputSnapshotState;
import io.prestosql.spi.Page;
import io.prestosql.spi.plan.PlanNodeId;
import io.prestosql.spi.snapshot.BlockEncodingSerdeProvider;
import io.prestosql.spi.snapshot.RestorableConfig;
import io.prestosql.spi.type.Type;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand All @@ -35,8 +38,7 @@
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

// TODO-cp-I2TJ3G: will add snapshot support later
@RestorableConfig(unsupported = true)
@RestorableConfig()
public class CommonTableExpressionOperator
implements Operator, Closeable
{
Expand All @@ -47,9 +49,10 @@ public class CommonTableExpressionOperator
private final PlanNodeId consumer;
private final CommonTableExecutionContext cteContext;
private final Function<Page, Page> pagePreprocessor;
private final int operatorInstaceId;
private int operatorInstaceId;
private boolean finish;
private boolean isFeeder;
private final SingleInputSnapshotState snapshotState;

public CommonTableExpressionOperator(
PlanNodeId self,
Expand All @@ -65,6 +68,7 @@ public CommonTableExpressionOperator(
this.cteContext = requireNonNull(cteContext, "CTE context is null");
this.operatorInstaceId = operatorInstaceId;
this.pagePreprocessor = pagePreprocessor;
this.snapshotState = operatorContext.isSnapshotEnabled() ? SingleInputSnapshotState.forOperator(this, operatorContext) : null;

synchronized (cteContext) {
if (cteContext.isFeeder(consumer)) {
Expand Down Expand Up @@ -176,6 +180,15 @@ public boolean needsInput()
@Override
public void addInput(Page page)
{

checkState(needsInput(), "Operator is already finishing");
requireNonNull(page, "page is null");

if (snapshotState != null) {
if (snapshotState.processPage(page)) {
return;
}
}
/* Got a new page... Place it in the Queue! */
Page addPage = pagePreprocessor.apply(page);
cteContext.addPage(addPage);
Expand All @@ -189,6 +202,13 @@ public void addInput(Page page)
@Override
public Page getOutput()
{
if (snapshotState != null) {
Page marker = snapshotState.nextMarker();
if (marker != null) {
return marker;
}
}

try {
Page page = cteContext.getPage(consumer);
if (page != null) {
Expand All @@ -210,8 +230,7 @@ public Page getOutput()
@Override
public Page pollMarker()
{
//TODO-cp-I2TJ3G: Operator currently not supported for Snapshot
return null;
return snapshotState.nextMarker();
}

/**
Expand Down Expand Up @@ -264,6 +283,9 @@ public void finish()
@Override
public boolean isFinished()
{
if (snapshotState != null && snapshotState.hasMarker()) {
return false;
}
return finish;
}

Expand All @@ -273,6 +295,46 @@ public boolean isFinished()
@Override
public void close() throws IOException
{
if (snapshotState != null) {
snapshotState.close();
}
LOG.debug("CTE(" + cteContext.getName() + ")[" + consumer + "-" + operatorInstaceId + "] Operator Closed");
}

@Override
public Object capture(BlockEncodingSerdeProvider serdeProvider)
{

CommonTableOperatorState myState = new CommonTableOperatorState();
myState.operatorContext = operatorContext.capture(serdeProvider);
myState.cteContext = cteContext.capture(serdeProvider);
myState.finish = finish;
myState.isFeeder = isFeeder;
myState.operatorInstaceId = operatorInstaceId;
return myState;

}

@Override
public void restore(Object state, BlockEncodingSerdeProvider serdeProvider)
{
CommonTableOperatorState myState = (CommonTableOperatorState) state;
this.operatorContext.restore(myState.operatorContext, serdeProvider);
this.cteContext.restore(myState.cteContext, serdeProvider);
finish = myState.finish;
isFeeder = myState.isFeeder;
operatorInstaceId = myState.operatorInstaceId;
}


private static class CommonTableOperatorState
implements Serializable
{

private Object operatorContext;
private Object cteContext;
private int operatorInstaceId;
private boolean finish;
private boolean isFeeder;
}
}

0 comments on commit 92a144d

Please sign in to comment.