-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
190 lines (149 loc) · 6.33 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import sys
import asyncio
import typing
from plugin.v1.plugin_grpc import GatewayDPluginServiceBase
from google.protobuf.struct_pb2 import ListValue, Struct, Value
from google.protobuf.json_format import MessageToDict
import grpclib
from grpclib.server import Server, Stream
from grpclib.reflection.service import ServerReflection
from grpclib.health.service import Health
from grpclib.health.check import ServiceStatus
import logging
logging.basicConfig(filename="plugin.log", level=logging.INFO)
async def defaults(stream: Stream[Struct, Struct]) -> None:
"""Default handler for all hooks.
Note:
The GatewayDPluginServiceBase class implements all hooks as abstract
methods, so we need to implement them all. This is a default handler
to use for all hooks that we don't need to implement.
"""
req = await stream.recv_message()
if req:
await stream.send_message(req)
else:
await stream.send_message(Struct())
class Plugin(GatewayDPluginServiceBase):
async def GetPluginConfig(self, stream: Stream[Struct, Struct]) -> None:
# Ignore the request, as it is empty.
await stream.recv_message()
await stream.send_message(
Struct(
fields={
"id": Value(
struct_value=Struct(
fields={
"name": Value(string_value="plugin-template-python"),
"version": Value(string_value="0.1.0"),
"remoteUrl": Value(
string_value="github.com/gatewayd-io/plugin-template-python"
),
}
)
),
"hooks": Value(
list_value=ListValue(
values=[
# The list of hooks that the plugin implements.
Value(number_value=16), # OnTrafficFromClient
]
)
),
}
)
)
async def OnBooted(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnBooting(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnClosed(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnClosing(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnConfigLoaded(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnHook(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnNewClient(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnNewLogger(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnNewPool(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnNewProxy(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnNewServer(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnOpened(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnOpening(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnRun(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnShutdown(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnSignal(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnTick(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnTraffic(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnTrafficFromClient(self, stream: Stream[Struct, Struct]) -> None:
"""
This is an example of how to use the OnTrafficFromClient hook to
intercept traffic from the client and modify it before it is sent to
the server. In this example, we simply log the request and send it
to the server.
"""
req = await stream.recv_message()
if req:
logging.info(MessageToDict(req))
await stream.send_message(req)
else:
await stream.send_message(Struct())
async def OnTrafficFromServer(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnTrafficToClient(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
async def OnTrafficToServer(self, stream: Stream[Struct, Struct]) -> None:
await defaults(stream)
def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]:
"""Mapping of service methods to handlers.
Note:
This method is used to remap the service name to the plugin name,
so that the plugin health check works properly.
"""
# Get the mapping from the base class
mapping = super().__mapping__()
remapping = {}
# Replace the service name with the plugin name
for path in mapping:
repath = path.replace("plugin.v1.GatewayDPluginService", "plugin")
remapping[repath] = mapping[path]
# Merge the two mappings
remapping.update(mapping)
return remapping
async def serve() -> None:
# Instantiate the plugin.
plugin = Plugin()
# Create a health check for the plugin.
plugin_health = ServiceStatus()
plugin_health.set(True)
health = Health({plugin: [plugin_health]})
# Add reflection for the plugin and health check.
services = ServerReflection.extend([plugin, health])
# Instantiate the server.
server = Server(services)
# Start the server.
await server.start("127.0.0.1", 12345)
await server.wait_closed()
if __name__ == "__main__":
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# This is a special message that tells gatewayd to start the plugin.
print("1|0|tcp|127.0.0.1:12345|grpc")
sys.stdout.flush()
asyncio.run(serve())
except KeyboardInterrupt:
pass