-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathu_singleton.py
52 lines (39 loc) · 941 Bytes
/
u_singleton.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# -*- coding:utf-8 -*-
# @Author: wzy
# @Time: 2020-9-22 09:35:57
# 线程安全单例模式类装饰器
__all__ = ["synchronized", "singleton_cls"]
import multiprocessing
from functools import wraps
# 线程安全函数装饰器
def synchronized(func):
"""
Usage:
@synchronized
def test(*args, **kwargs):
pass
"""
func.__lock__ = multiprocessing.Lock()
@wraps(func)
def synced_func(*args, **kws):
with func.__lock__:
return func(*args, **kws)
return synced_func
# 线程安全单例类装饰器
def singleton_cls(cls):
"""
Usage:
@singleton_cls
class Test:
pass
"""
instances = {}
@wraps(cls)
@synchronized
def get_instance(*args, **kw):
if cls not in instances:
instances[cls] = cls(*args, **kw)
return instances[cls]
return get_instance
if __name__ == '__main__':
pass