Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New implementation of coordination service #236

Merged
merged 45 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
6da5614
add: scratch of new Coordination Service
piece-of-tart Oct 10, 2023
d13ac52
add: new file for CoordinationSemaphore
piece-of-tart Oct 11, 2023
e02134d
add: interface CoordinationStream
piece-of-tart Oct 11, 2023
0268197
add: logs in CoordinationSession
piece-of-tart Oct 11, 2023
92301d4
add: test for ephemeral semaphore
piece-of-tart Oct 12, 2023
f2c2f5e
add: describeSemaphore() without Watcher callback
piece-of-tart Oct 17, 2023
39dfa8a
add: connectionWasFailed field in DescribeSemaphoreChanged
piece-of-tart Oct 17, 2023
ce23b55
add: Leader election example
piece-of-tart Oct 18, 2023
e9af6a1
add: more logs
piece-of-tart Oct 24, 2023
57e1ad7
add: ExecutorService in response handler
piece-of-tart Oct 24, 2023
5df23f9
add: serviceDiscovery test example
piece-of-tart Oct 24, 2023
77dc0c9
First version of updated coordination api
alex268 Oct 31, 2023
656064b
add: Leader Election service
piece-of-tart Nov 6, 2023
919b2e4
add: Configuration scenario
piece-of-tart Nov 6, 2023
82cecaf
add: access to leader endpoint in tests
piece-of-tart Nov 9, 2023
6d54611
add: sync methods
piece-of-tart Nov 9, 2023
fa10ede
add: Leader Election methods - whenTakeLead && interruptLeadership
piece-of-tart Nov 14, 2023
d89bbab
add: documentation for LeaderElection
piece-of-tart Nov 14, 2023
7920e75
add: documentation for Service discovery
piece-of-tart Nov 14, 2023
84fa46d
add: Configuration scenario - documentation & resetObserver
piece-of-tart Nov 14, 2023
ace1c7b
add: sync version of interruptLeadership
piece-of-tart Nov 14, 2023
330325b
update: LeaderElection observers
piece-of-tart Nov 15, 2023
dca25eb
Rewrite api
alex268 Nov 9, 2023
ba582b8
Add RetryPolicy to CoordinationSessionSettings
alex268 Jan 24, 2024
06b677b
Added state to CoordinationSession
alex268 Jan 24, 2024
d7567c6
Update dump implementation
alex268 Jan 24, 2024
e50ed20
Update dump helpers
alex268 Jan 24, 2024
8892ac1
Added base implementation of LeaderElection
alex268 Jan 24, 2024
3352873
Updated tests
alex268 Jan 24, 2024
962ecb0
Update api of CoordinationSession
alex268 Jan 26, 2024
c0f2ce7
Implementation for nodes and basic session connect
alex268 Feb 15, 2024
5824c58
Added graceful shutdown for session
alex268 Feb 17, 2024
a4cdf79
Added implementation for creating, deleting and updating semaphores
alex268 Feb 17, 2024
ff3af33
Implementations for acquiring and releassing semaphores
alex268 Feb 17, 2024
de54051
Added implementation for describe and watch
alex268 Feb 19, 2024
b8faf90
Removed old reciptes
alex268 Feb 19, 2024
da0b503
Reorganaize names
alex268 Feb 19, 2024
2a2628a
Thread-safe state update
alex268 Feb 19, 2024
0a84921
Added retries of disconnected sessions
alex268 Feb 20, 2024
65c92f2
Added retries of idempotent messages
alex268 Feb 20, 2024
17f7883
Updates javadoc and method names
alex268 Feb 21, 2024
6cebeac
Added basic tests
alex268 Feb 21, 2024
4becd1b
Rename UNSTARTED to INITIAL
alex268 Feb 22, 2024
b133b7c
Added suffix Impl to all implementations
alex268 Feb 22, 2024
c066daf
Fixed typos
alex268 Feb 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions coordination/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,89 +4,124 @@

import javax.annotation.WillNotClose;

import tech.ydb.coordination.impl.CoordinationClientImpl;
import tech.ydb.coordination.rpc.grpc.GrpcCoordinationRpc;
import tech.ydb.coordination.description.NodeConfig;
import tech.ydb.coordination.impl.CoordinationServiceImpl;
import tech.ydb.coordination.settings.CoordinationNodeSettings;
import tech.ydb.coordination.settings.CoordinationSessionSettings;
import tech.ydb.coordination.settings.DescribeCoordinationNodeSettings;
import tech.ydb.coordination.settings.DropCoordinationNodeSettings;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcTransport;

/**
* @author Kirill Kurdyukov
* @author Alexandr Gorshein
*/
public interface CoordinationClient {

static CoordinationClient newClient(@WillNotClose GrpcTransport transport) {
return new CoordinationClientImpl(GrpcCoordinationRpc.useTransport(transport));
return CoordinationServiceImpl.newClient(transport);
}

/**
* Bidirectional stream used to establish a session with a coordination node
* <p>
* Relevant APIs for managing semaphores, distributed locking, creating or
* restoring a previously established session are described using nested
* messages in SessionRequest and SessionResponse. Session is established
* with a specific coordination node (previously created using CreateNode
* below) and semaphores are local to that coordination node.
* Database path
* Used for creating coordination node path
*
* @return path to database
*/
String getDatabase();

/**
* Creates a new coordination session.
* The coordination session establishes bidirectional grpc stream with a specific coordination node and uses this
* stream for exchanging messages with the coordination service.
*
* @return coordination node session
* @param path full path to coordination node
* @param settings coordination session settings
* @return new instance of coordination session
*/
CoordinationSession createSession();
CoordinationSession createSession(String path, CoordinationSessionSettings settings);

/**
* Creates a new coordination node.
*
* @param path full path to coordination node
* @param coordinationNodeSettings coordination node settings
* @return status of request
* @param settings coordination node settings
* @return future with status of operation
*/
CompletableFuture<Status> createNode(
String path,
CoordinationNodeSettings coordinationNodeSettings
);
CompletableFuture<Status> createNode(String path, CoordinationNodeSettings settings);

/**
* Modifies settings of a coordination node
*
* @param path full path to coordination node
* @param coordinationNodeSettings coordination node settings
* @return status of request
* @param settings coordination node settings
* @return future with status of operation
*/
CompletableFuture<Status> alterNode(
String path,
CoordinationNodeSettings coordinationNodeSettings
);
CompletableFuture<Status> alterNode(String path, CoordinationNodeSettings settings);

/**
* Drops a coordination node
*
* @param path full path to coordination node
* @param dropCoordinationNodeSettings drop coordination node settings
* @return request of status
* @param settings drop coordination node settings
* @return future with status of operation
*/
CompletableFuture<Status> dropNode(
String path,
DropCoordinationNodeSettings dropCoordinationNodeSettings
);
CompletableFuture<Status> dropNode(String path, DropCoordinationNodeSettings settings);

/**
* Describes a coordination node
*
* @param path full path to coordination node
* @param describeCoordinationNodeSettings describe coordination node settings
* @return request of status
* @param settings describe coordination node settings
* @return future with node configuration
*/
CompletableFuture<Status> describeNode(
String path,
DescribeCoordinationNodeSettings describeCoordinationNodeSettings
);
CompletableFuture<Result<NodeConfig>> describeNode(String path, DescribeCoordinationNodeSettings settings);



// --------------- default methods ------------------------------

/**
* Database path
* Using for creating coordination node path
* Creates a new coordination session with default settings.
* The coordination session establishes bidirectional grpc stream with a specific coordination node and uses this
* stream for exchanging messages with the coordination service.
*
* @return path to database
* @param path full path to coordination node
* @return new instance of coordination session
*/
String getDatabase();
default CoordinationSession createSession(String path) {
return createSession(path, CoordinationSessionSettings.newBuilder().build());
}

/**
* Creates a new coordination node.
*
* @param path full path to coordination node
* @return future with status of operation
*/
default CompletableFuture<Status> createNode(String path) {
return createNode(path, CoordinationNodeSettings.newBuilder().build());
}

/**
* Drops a coordination node
*
* @param path full path to coordination node
* @return future with status of operation
*/
default CompletableFuture<Status> dropNode(String path) {
return dropNode(path, DropCoordinationNodeSettings.newBuilder().build());
}

/**
* Describes a coordination node
*
* @param path full path to coordination node
* @return future with result of operation
*/
default CompletableFuture<Result<NodeConfig>> describeNode(String path) {
return describeNode(path, DescribeCoordinationNodeSettings.newBuilder().build());
}
}
Loading
Loading