Skip to content

Commit

Permalink
NIFI-12807: Handle clustering in Provenance, Lineage, and Queue Listi…
Browse files Browse the repository at this point in the history
…ng (apache#8431)

* NIFI-12807:
- Handling cluster node id in provenance listing, lineage graph, and queue listing.

* NIFI-12807:
- Addressing review feedback.

This closes apache#8431
  • Loading branch information
mcgilman authored Feb 21, 2024
1 parent 0a2ba31 commit 6c76eca
Show file tree
Hide file tree
Showing 46 changed files with 686 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@
"buildTarget": "nifi:build:production"
},
"development": {
"buildTarget": "nifi:build:development"
"buildTarget": "nifi:build:development",
"servePath": "/nifi"
}
},
"defaultConfiguration": "development"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,5 @@ const target = {
};

export default {
'/nifi-api/*': target,
'/nifi-docs/*': target,
'/nifi-content-viewer/*': target,
// the following entry is needed because the content viewer (and other UIs) load resources from existing nifi ui
'/nifi/*': target
'/': target
};
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import { ErrorEffects } from './state/error/error.effects';
import { MatSnackBarModule } from '@angular/material/snack-bar';
import { PipesModule } from './pipes/pipes.module';
import { DocumentationEffects } from './state/documentation/documentation.effects';
import { ClusterSummaryEffects } from './state/cluster-summary/cluster-summary.effects';

@NgModule({
declarations: [AppComponent],
Expand Down Expand Up @@ -73,7 +74,8 @@ import { DocumentationEffects } from './state/documentation/documentation.effect
ControllerServiceStateEffects,
SystemDiagnosticsEffects,
ComponentStateEffects,
DocumentationEffects
DocumentationEffects,
ClusterSummaryEffects
),
StoreDevtoolsModule.instrument({
maxAge: 25,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ export class FlowService implements PropertyDescriptorRetriever {
return this.httpClient.get(`${FlowService.API}/flow/status`);
}

getClusterSummary(): Observable<any> {
return this.httpClient.get(`${FlowService.API}/flow/cluster/summary`);
}

getControllerBulletins(): Observable<any> {
return this.httpClient.get(`${FlowService.API}/flow/controller/bulletins`);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ import { ImportFromRegistry } from '../../ui/canvas/items/flow/import-from-regis
import { selectCurrentUser } from '../../../../state/current-user/current-user.selectors';
import { NoRegistryClientsDialog } from '../../ui/common/no-registry-clients-dialog/no-registry-clients-dialog.component';
import { EditRemoteProcessGroup } from '../../ui/canvas/items/remote-process-group/edit-remote-process-group/edit-remote-process-group.component';
import { ErrorHelper } from '../../../../service/error-helper.service';

@Injectable()
export class FlowEffects {
Expand Down Expand Up @@ -144,16 +143,14 @@ export class FlowEffects {
combineLatest([
this.flowService.getFlow(request.id),
this.flowService.getFlowStatus(),
this.flowService.getClusterSummary(),
this.flowService.getControllerBulletins()
]).pipe(
map(([flow, flowStatus, clusterSummary, controllerBulletins]) => {
map(([flow, flowStatus, controllerBulletins]) => {
return FlowActions.loadProcessGroupSuccess({
response: {
id: request.id,
flow: flow,
flowStatus: flowStatus,
clusterSummary: clusterSummary.clusterSummary,
controllerBulletins: controllerBulletins
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,6 @@ export const initialState: FlowState = {
syncFailureCount: undefined
}
},
clusterSummary: {
clustered: false,
connectedToCluster: false,
connectedNodes: '',
connectedNodeCount: 0,
totalNodeCount: 0
},
refreshRpgDetails: null,
controllerBulletins: {
bulletins: [],
Expand Down Expand Up @@ -182,7 +175,6 @@ export const flowReducer = createReducer(
id: response.flow.processGroupFlow.id,
flow: response.flow,
flowStatus: response.flowStatus,
clusterSummary: response.clusterSummary,
controllerBulletins: response.controllerBulletins,
error: null,
status: 'success' as const
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,6 @@ export const selectLastRefreshed = createSelector(
(state: FlowState) => state.flow.processGroupFlow.lastRefreshed
);

export const selectClusterSummary = createSelector(selectFlowState, (state: FlowState) => state.clusterSummary);

export const selectControllerBulletins = createSelector(
selectFlowState,
(state: FlowState) => state.controllerBulletins.bulletins // TODO - include others?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ export interface LoadProcessGroupResponse {
id: string;
flow: ProcessGroupFlowEntity;
flowStatus: ControllerStatusEntity;
clusterSummary: ClusterSummary;
controllerBulletins: ControllerBulletinsEntity;
}

Expand Down Expand Up @@ -493,14 +492,6 @@ export interface ControllerStatusEntity {
controllerStatus: ControllerStatus;
}

export interface ClusterSummary {
clustered: boolean;
connectedToCluster: boolean;
connectedNodes?: string;
connectedNodeCount: number;
totalNodeCount: number;
}

export interface ControllerBulletinsEntity {
bulletins: BulletinEntity[];
controllerServiceBulletins: BulletinEntity[];
Expand All @@ -514,7 +505,6 @@ export interface FlowState {
flow: ProcessGroupFlowEntity;
flowStatus: ControllerStatusEntity;
refreshRpgDetails: RefreshRemoteProcessGroupPollingDetailsRequest | null;
clusterSummary: ClusterSummary;
controllerBulletins: ControllerBulletinsEntity;
dragging: boolean;
transitionRequired: boolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ import { loadFlowConfiguration } from '../../../../state/flow-configuration/flow
import { concatLatestFrom } from '@ngrx/effects';
import { selectUrl } from '../../../../state/router/router.selectors';
import { Storage } from '../../../../service/storage.service';
import {
loadClusterSummary,
startClusterSummaryPolling,
stopClusterSummaryPolling
} from '../../../../state/cluster-summary/cluster-summary.actions';

@Component({
selector: 'fd-canvas',
Expand Down Expand Up @@ -285,7 +290,9 @@ export class Canvas implements OnInit, OnDestroy {
this.canvasView.init(this.viewContainerRef, this.svg, this.canvas);

this.store.dispatch(loadFlowConfiguration());
this.store.dispatch(loadClusterSummary());
this.store.dispatch(startProcessGroupPolling());
this.store.dispatch(startClusterSummaryPolling());
}

private createSvg(): void {
Expand Down Expand Up @@ -595,5 +602,6 @@ export class Canvas implements OnInit, OnDestroy {
ngOnDestroy(): void {
this.store.dispatch(resetFlowState());
this.store.dispatch(stopProcessGroupPolling());
this.store.dispatch(stopClusterSummaryPolling());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
color: $primary-palette-500;
}

.warning {
color: $warn-palette-400;
}

.status-value {
color: $warn-palette-A400;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<div class="h-8 flow-status">
<div class="flex justify-between">
<div class="flex flex-1 justify-around pr-20">
@if (clusterSummary.clustered) {
@if (clusterSummary?.clustered) {
<div class="flex items-center gap-x-2" title="Connected nodes / Total number of nodes in the cluster">
<div class="fa fa-cubes" [class]="getClusterStyle()"></div>
<div class="text">{{ formatClusterMessage() }}</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
*/

import { Component, Input } from '@angular/core';
import { ClusterSummary, ControllerStatus } from '../../../../state/flow';
import { ControllerStatus } from '../../../../state/flow';
import { initialState } from '../../../../state/flow/flow.reducer';
import { BulletinsTip } from '../../../../../../ui/common/tooltips/bulletins-tip/bulletins-tip.component';
import { BulletinEntity, BulletinsTipInput } from '../../../../../../state/shared';

import { Search } from '../search/search.component';
import { NifiTooltipDirective } from '../../../../../../ui/common/tooltips/nifi-tooltip.directive';
import { ClusterSummary } from '../../../../../../state/cluster-summary';

@Component({
selector: 'flow-status',
Expand All @@ -34,7 +35,7 @@ import { NifiTooltipDirective } from '../../../../../../ui/common/tooltips/nifi-
export class FlowStatus {
@Input() controllerStatus: ControllerStatus = initialState.flowStatus.controllerStatus;
@Input() lastRefreshed: string = initialState.flow.processGroupFlow.lastRefreshed;
@Input() clusterSummary: ClusterSummary = initialState.clusterSummary;
@Input() clusterSummary: ClusterSummary | null = null;
@Input() bulletins: BulletinEntity[] = initialState.controllerBulletins.bulletins;
@Input() currentProcessGroupId: string = initialState.id;
@Input() loadingStatus = false;
Expand All @@ -46,7 +47,7 @@ export class FlowStatus {
}

formatClusterMessage(): string {
if (this.clusterSummary.connectedToCluster && this.clusterSummary.connectedNodes) {
if (this.clusterSummary?.connectedToCluster && this.clusterSummary.connectedNodes) {
return this.clusterSummary.connectedNodes;
} else {
return 'Disconnected';
Expand All @@ -55,8 +56,8 @@ export class FlowStatus {

getClusterStyle(): string {
if (
!this.clusterSummary.connectedToCluster ||
this.clusterSummary.connectedNodeCount != this.clusterSummary.totalNodeCount
this.clusterSummary?.connectedToCluster === false ||
this.clusterSummary?.connectedNodeCount != this.clusterSummary?.totalNodeCount
) {
return 'warning';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,14 @@ import { HttpClientTestingModule } from '@angular/common/http/testing';
import { NewCanvasItem } from './new-canvas-item/new-canvas-item.component';
import { MatMenuModule } from '@angular/material/menu';
import { MatDividerModule } from '@angular/material/divider';
import {
selectClusterSummary,
selectControllerBulletins,
selectControllerStatus
} from '../../../state/flow/flow.selectors';
import { ClusterSummary, ControllerStatus } from '../../../state/flow';
import { selectControllerBulletins, selectControllerStatus } from '../../../state/flow/flow.selectors';
import { ControllerStatus } from '../../../state/flow';
import { CdkConnectedOverlay, CdkOverlayOrigin } from '@angular/cdk/overlay';
import { FormsModule, ReactiveFormsModule } from '@angular/forms';
import { Component } from '@angular/core';
import { RouterTestingModule } from '@angular/router/testing';
import { ClusterSummary } from '../../../../../state/cluster-summary';
import { selectClusterSummary } from '../../../../../state/cluster-summary/cluster-summary.selectors';

describe('HeaderComponent', () => {
let component: HeaderComponent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import { Store } from '@ngrx/store';
import { CanvasState } from '../../../state';
import {
selectCanvasPermissions,
selectClusterSummary,
selectControllerBulletins,
selectControllerStatus,
selectCurrentProcessGroupId,
Expand All @@ -36,6 +35,7 @@ import { MatDividerModule } from '@angular/material/divider';
import { RouterLink } from '@angular/router';
import { FlowStatus } from './flow-status/flow-status.component';
import { Navigation } from '../../../../../ui/common/navigation/navigation.component';
import { selectClusterSummary } from '../../../../../state/cluster-summary/cluster-summary.selectors';

@Component({
selector: 'fd-header',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import { NiFiCommon } from '../../../../../service/nifi-common.service';
import { ParameterContextEntity } from '../../../state/parameter-context-listing';
import { FlowConfiguration } from '../../../../../state/flow-configuration';
import { CurrentUser } from '../../../../../state/current-user';
import { ParameterProviderConfigurationEntity } from '../../../../../state/shared';

@Component({
selector: 'parameter-context-table',
Expand Down
Loading

0 comments on commit 6c76eca

Please sign in to comment.