Skip to content

Commit

Permalink
feat: add polling watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaaaash committed Jan 3, 2025
1 parent fac07f6 commit 4f05f8e
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 48 deletions.
6 changes: 6 additions & 0 deletions packages/core-browser/src/react-providers/config-provider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,15 @@ export interface AppConfig {

/**
* Unrecursive directories
* @deprecated Use `pollingWatcherDirectories` instead
*/
unRecursiveDirectories?: string[];

/**
* Polling watcher directories
*/
pollingWatcherDirectories?: string[];

/**
* Recursive watcher backend type
*
Expand Down
1 change: 1 addition & 0 deletions packages/core-common/src/types/file-watch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export interface FileSystemWatcherClient {

export interface WatchOptions {
excludes: string[];
pollingWatch?: boolean;
}

export interface DidFilesChangedParams {
Expand Down
5 changes: 4 additions & 1 deletion packages/core-common/src/types/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ export interface FileSystemProvider {
* @param options Configures the watch.
* @returns A disposable that tells the provider to stop watching the `uri`.
*/
watch(uri: Uri, options: { excludes?: string[]; recursive?: boolean }): number | Promise<number>;
watch(
uri: Uri,
options: { excludes?: string[]; recursive?: boolean; pollingWatch?: boolean },
): number | Promise<number>;

unwatch?(watcherId: number): void | Promise<void>;

Expand Down
6 changes: 3 additions & 3 deletions packages/file-service/src/browser/file-service-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,8 @@ export class FileServiceClient implements IFileServiceClient, IDisposable {

// 添加监听文件
async watchFileChanges(uri: URI, excludes?: string[]): Promise<IFileServiceWatcher> {
const unRecursiveDirectories = this.appConfig.unRecursiveDirectories || [];
const isUnRecursive = unRecursiveDirectories.some((dir) => uri.path.toString().startsWith(dir));
const pollingWatcherDirectories = this.appConfig.pollingWatcherDirectories || [];
const pollingWatch = pollingWatcherDirectories.some((dir) => uri.path.toString().startsWith(dir));

const _uri = this.convertUri(uri.toString());
const originWatcher = this.uriWatcherMap.get(_uri.toString());
Expand All @@ -374,7 +374,7 @@ export class FileServiceClient implements IFileServiceClient, IDisposable {

const watcherId = await provider.watch(_uri.codeUri, {
excludes,
recursive: !isUnRecursive,
pollingWatch,
});

this.watcherDisposerMap.set(id, {
Expand Down
5 changes: 4 additions & 1 deletion packages/file-service/src/common/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ export interface IWatcherProcessManager {
setClient(client: FileSystemWatcherClient): void;
dispose(): Promise<void>;

watch(uri: UriComponents, options?: { excludes?: string[]; recursive?: boolean }): Promise<number>;
watch(
uri: UriComponents,
options?: { excludes?: string[]; recursive?: boolean; pollingWatch?: boolean },
): Promise<number>;
unWatch(watcherId: number): Promise<void>;
setWatcherFileExcludes(excludes: string[]): Promise<void>;

Expand Down
6 changes: 5 additions & 1 deletion packages/file-service/src/node/disk-file-system.provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,17 @@ export class DiskFileSystemProvider extends RPCService<IRPCDiskFileSystemProvide
* @param {{ excludes: string[] }}
* @memberof DiskFileSystemProvider
*/
async watch(uri: UriComponents, options?: { excludes?: string[]; recursive?: boolean }): Promise<number> {
async watch(
uri: UriComponents,
options?: { excludes?: string[]; recursive?: boolean; pollingWatch?: boolean },
): Promise<number> {
await this.whenReady;
const _uri = Uri.revive(uri);

const id = await this.watcherProcessManager.watch(_uri, {
excludes: options?.excludes ?? [],
recursive: options?.recursive ?? this.recursive,
pollingWatch: options?.pollingWatch,
});

return id;
Expand Down
150 changes: 111 additions & 39 deletions packages/file-service/src/node/hosted/recursive/file-service-watcher.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import paths from 'path';
import { tmpdir } from 'os';
import paths, { join } from 'path';

import ParcelWatcher from '@parcel/watcher';
import fs from 'fs-extra';
Expand All @@ -9,7 +10,6 @@ import {
FileChangeType,
FileSystemWatcherClient,
IFileSystemWatcherServer,
INsfw,
RecursiveWatcherBackend,
WatchOptions,
} from '@opensumi/ide-core-common';
Expand All @@ -20,11 +20,13 @@ import {
FileUri,
IDisposable,
ParsedPattern,
RunOnceScheduler,
isLinux,
isWindows,
parseGlob,
} from '@opensumi/ide-core-common/lib/utils';

import { INsfw } from '../../../common/watcher';
import { FileChangeCollection } from '../../file-change-collection';
import { shouldIgnorePath } from '../shared';

Expand Down Expand Up @@ -52,6 +54,8 @@ export interface NsfwFileSystemWatcherOption {
export class FileSystemWatcherServer extends Disposable implements IFileSystemWatcherServer {
private static readonly PARCEL_WATCHER_BACKEND = isWindows ? 'windows' : isLinux ? 'inotify' : 'fs-events';

private static DEFAULT_POLLING_INTERVAL = 100;

private WATCHER_HANDLERS = new Map<
number,
{ path: string; handlers: ParcelWatcher.SubscribeCallback[]; disposable: IDisposable }
Expand Down Expand Up @@ -194,11 +198,62 @@ export class FileSystemWatcherServer extends Disposable implements IFileSystemWa
basePath: string,
rawOptions: WatchOptions | undefined,
): Promise<DisposableCollection> {
const disposables = new DisposableCollection();
this.logger.log('Start watching:', basePath, rawOptions);
if (!(await fs.pathExists(basePath))) {
return disposables;
return new DisposableCollection();
}

const realPath = await fs.realpath(basePath);

if (this.isEnableNSFW()) {
return this.watchWithNsfw(realPath, watcherId, rawOptions);
} else {
// polling
if (rawOptions?.pollingWatch) {
this.logger.log('Start polling watch:', realPath);
return this.pollingWatch(realPath, watcherId, rawOptions);
}

return this.watchWithParcel(realPath, watcherId, rawOptions);
}
}

private async watchWithNsfw(realPath: string, watcherId: number, rawOptions?: WatchOptions | undefined) {
const disposables = new DisposableCollection();

const nsfw = await this.withNSFWModule();
const watcher: INsfw.NSFW = await nsfw(
realPath,
(events: INsfw.ChangeEvent[]) => this.handleNSFWEvents(events, watcherId),
{
errorCallback: (err) => {
this.logger.error('NSFW watcher encountered an error and will stop watching.', err);
// see https://github.com/atom/github/issues/342
this.unwatchFileChanges(watcherId);
},
},
);

await watcher.start();

disposables.push(
Disposable.create(async () => {
this.watcherOptions.delete(watcherId);
await watcher.stop();
}),
);

const excludes = this.excludes.concat(rawOptions?.excludes || []);

this.watcherOptions.set(watcherId, {
excludesPattern: excludes.map((pattern) => parseGlob(pattern)),
excludes,
});
return disposables;
}

private async watchWithParcel(realPath: string, watcherId: number, rawOptions?: WatchOptions | undefined) {
const disposables = new DisposableCollection();
const tryWatchDir = async (maxRetries = 3, retryDelay = 1000) => {
for (let times = 0; times < maxRetries; times++) {
try {
Expand All @@ -213,8 +268,15 @@ export class FileSystemWatcherServer extends Disposable implements IFileSystemWa
const handlers = this.WATCHER_HANDLERS.get(watcherId)?.handlers;

if (!handlers) {
this.logger.log('No handler found for watcher', watcherId);
return;
}

this.logger.log('Received events:', events);
if (events.length === 0) {
return;
}

for (const handler of handlers) {
(handler as ParcelWatcher.SubscribeCallback)(err, events);
}
Expand All @@ -238,49 +300,59 @@ export class FileSystemWatcherServer extends Disposable implements IFileSystemWa
return undefined; // watch 失败则返回 undefined
};

if (this.isEnableNSFW()) {
const nsfw = await this.withNSFWModule();
const watcher: INsfw.NSFW = await nsfw(
realPath,
(events: INsfw.ChangeEvent[]) => this.handleNSFWEvents(events, watcherId),
{
errorCallback: (err) => {
this.logger.error('NSFW watcher encountered an error and will stop watching.', err);
// see https://github.com/atom/github/issues/342
this.unwatchFileChanges(watcherId);
},
},
);

await watcher.start();
const hanlder: ParcelWatcher.AsyncSubscription | undefined = await tryWatchDir();

if (hanlder) {
// watch 成功才加入 disposables,否则也就无需 dispose
disposables.push(
Disposable.create(async () => {
this.watcherOptions.delete(watcherId);
await watcher.stop();
if (hanlder) {
await hanlder.unsubscribe();
}
}),
);
}

const excludes = this.excludes.concat(rawOptions?.excludes || []);
return disposables;
}

this.watcherOptions.set(watcherId, {
excludesPattern: excludes.map((pattern) => parseGlob(pattern)),
excludes,
});
} else {
const hanlder: ParcelWatcher.AsyncSubscription | undefined = await tryWatchDir();

if (hanlder) {
// watch 成功才加入 disposables,否则也就无需 dispose
disposables.push(
Disposable.create(async () => {
if (hanlder) {
await hanlder.unsubscribe();
}
}),
);
private async pollingWatch(realPath: string, watcherId: number, rawOptions?: WatchOptions | undefined) {
const disposables = new DisposableCollection();
const snapshotFile = join(tmpdir(), `watcher-snapshot-${watcherId}`);
let counter = 0;

const pollingWatcher = new RunOnceScheduler(async () => {
counter++;
if (counter > 1) {
const parcelEvents = await ParcelWatcher.getEventsSince(realPath, snapshotFile, {
ignore: rawOptions?.excludes,
backend: FileSystemWatcherServer.PARCEL_WATCHER_BACKEND,
});

const handlers = this.WATCHER_HANDLERS.get(watcherId)?.handlers;

if (!handlers) {
this.logger.log('No handler found for watcher', watcherId);
return;
}

this.logger.log('Received events:', parcelEvents);
for (const handler of handlers) {
(handler as ParcelWatcher.SubscribeCallback)(null, parcelEvents);
}
}
}

await ParcelWatcher.writeSnapshot(realPath, snapshotFile, {
ignore: rawOptions?.excludes,
backend: FileSystemWatcherServer.PARCEL_WATCHER_BACKEND,
});

pollingWatcher.schedule();
}, FileSystemWatcherServer.DEFAULT_POLLING_INTERVAL);

pollingWatcher.schedule(0);

disposables.push(pollingWatcher);

return disposables;
}
Expand Down
11 changes: 9 additions & 2 deletions packages/file-service/src/node/hosted/watcher.host.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ export class WatcherHostServiceImpl implements IWatcherHostService {
return watcherServer;
}

private async doWatch(uri: Uri, options?: { excludes?: string[]; recursive?: boolean }): Promise<number> {
private async doWatch(
uri: Uri,
options?: { excludes?: string[]; recursive?: boolean; pollingWatch?: boolean },
): Promise<number> {
const watcherServer = this.getWatcherServer(options?.recursive);
if (!watcherServer) {
return -1;
Expand All @@ -98,6 +101,7 @@ export class WatcherHostServiceImpl implements IWatcherHostService {
const mergedExcludes = new Set([...(options?.excludes ?? []), ...this.defaultExcludes]);
const id = await watcherServer.watchFileChanges(uri.toString(), {
excludes: Array.from(mergedExcludes),
pollingWatch: options?.pollingWatch,
});

this.watchedDirs.add(uri.toString());
Expand All @@ -113,7 +117,10 @@ export class WatcherHostServiceImpl implements IWatcherHostService {
return id;
}

async $watch(uri: UriComponents, options?: { excludes?: string[]; recursive?: boolean }): Promise<number> {
async $watch(
uri: UriComponents,
options?: { excludes?: string[]; recursive?: boolean; pollingWatch?: boolean },
): Promise<number> {
const _uri = URI.revive(uri);
return this.doWatch(_uri, options);
}
Expand Down
5 changes: 4 additions & 1 deletion packages/file-service/src/node/watcher-process-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ export class WatcherProcessManagerImpl implements IWatcherProcessManager {
}
}

async watch(uri: UriComponents, options?: { excludes?: string[]; recursive?: boolean }): Promise<number> {
async watch(
uri: UriComponents,
options?: { excludes?: string[]; recursive?: boolean; pollingWatch: boolean },
): Promise<number> {
this.logger.log('Wait for watcher process ready...');
await this._whenReadyDeferred.promise;
this.logger.log('start watch: ', uri);
Expand Down
1 change: 1 addition & 0 deletions packages/startup/entry/web/render-app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { SampleModule } from '../sample-modules';
import { AILayout } from '@opensumi/ide-ai-native/lib/browser/layout/ai-layout';
import { DESIGN_MENU_BAR_RIGHT } from '@opensumi/ide-design';
import { AI_CHAT_LOGO_AVATAR_ID } from '@opensumi/ide-ai-native';
import { RecursiveWatcherBackend } from '@opensumi/ide-core-common/lib/types/file-watch';

const CLIENT_ID = 'W_' + uuid();

Expand Down

0 comments on commit 4f05f8e

Please sign in to comment.