当Django的内置权限无法满足需求的时候就自己扩展吧~
overmind项目使用了Django内置的权限系统,Django内置权限系统基于model层做控制,新的model创建后会默认新建三个权限,分别为:add、change、delete,如果给用户或组赋予delete的权限,那么用户将可以删除这个model下的所有数据。
原本overmind只管理了我们自己部门的数据库,权限设置只针对具体的功能不针对细粒度的数据库实例,例如用户A 有审核的权限,那么用户A 可以审核所有的DB,此时使用内置的权限系统就可以满足需求了,但随着系统的不断完善要接入其他部门的数据库管理,这就要求针对不同用户开放不同DB的权限了,例如A部门的用户只能操作A部门的DB,Django内置基于model的权限无法满足需求了。
先来确定下需求: 1. 保持原本的基于功能的权限控制不变,例如用户A有查询权限,B有审核权限 2. 增加针对DB实例的权限控制,例如用户A只能查询特定的DB,B只能审核特定的DB
对于上边需求1用内置的权限系统已经可以实现,这里不赘述,重点看下需求2,DB信息都存放在同一个表里,不同用户能操作不同的DB,也就是需要把每一条DB信息与有权限操作的用户进行关联,为了方便操作,我们考虑把DB跟用户组关联,在用户组里的用户都有权限,而操作类型经过分析主要有两类读和写,那么需要给每个MySQL实例添加两个字段分别记录对此实例有读和写权限的用户组
如下代码在原来的model基础上添加read_groups
和write_groups
字段,DB实例跟用户组应是ManyToManyField多对多关系,一个实例可以关联多个用户组,一个用户组也可以属于多个实例
class Mysql(models.Model):
Env = (
(1, 'Dev'),
(2, 'Qa'),
(3, 'Prod'),
)
create_time = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
update_time = models.DateTimeField(auto_now=True, verbose_name='更新时间')
project_id = models.IntegerField(verbose_name='项目')
project_tmp = models.CharField(max_length=128, default='')
environment = models.IntegerField(choices=Env, verbose_name='环境')
master_host = models.GenericIPAddressField(verbose_name='master主机')
master_port = models.IntegerField(default=3306, verbose_name='master端口')
slave_host = models.GenericIPAddressField(null=True, verbose_name='slave主机')
slave_port = models.IntegerField(null=True, default=3306, verbose_name='slave端口')
database = models.CharField(max_length=64, verbose_name='数据库')
read_groups = models.ManyToManyField(Group, related_name='read', verbose_name='读权限')
write_groups = models.ManyToManyField(Group, related_name='write', verbose_name='写权限')
description = models.TextField(null=True, verbose_name='备注')
model确定了,接下来我们分三部分详细介绍下权限验证的具体实现
如上图列表页,每个用户进入系统后只能查看自己有读权限的MySQL实例列表,管理员能查看所有,代码如下:
def mysql(request):
if request.method == 'GET':
if request.user.is_superuser:
_lists = Mysql.objects.all().order_by('id')
else:
# 获取登录用户的所有组
_user_groups = request.user.groups.all()
# 构造一个空的QuerySet然后合并
_lists = Mysql.objects.none()
for group in _user_groups:
_lists = _lists | group.read.all()
return render(request, 'overmind/mysql.index.html', {'request': request, 'lPage': _lists})
实现的思路是:获取登录用户的所有组,然后循环查询每个组有读取权限的数据库实例,最后把每个组有权限读的数据库实例进行合并返回
获取登录用户的所有组用到了ManyToMany的查询方法:request.user.groups.all()
最终返回的一个结果是QuerySet,所以我们需要先构造一个空的Queryset:Mysql.objects.none()
QuerySet合并不能用简单的相加,应为:QuerySet-1 | QuerySet-2
如上图系统中有很多功能是需要根据项目、环境查询对应的DB信息的,对于此类接口也需要控制用户只能查询自己有权限读的DB实例,管理员能查看所有,代码如下:
def get_project_database(request, project, environment):
if request.method == 'GET':
_jsondata = {}
if request.user.is_superuser:
# 返回所有项目和环境匹配的DB
_lists = Mysql.objects.filter(
project_id=int(project),
environment=int(environment)
)
_jsondata = {i.id: i.database for i in _lists}
else:
# 只返回用户有权限查询的DB
_user_groups = request.user.groups.all()
for group in _user_groups:
# 循环mysql表中有read_groups权限的所有组
for mysql in group.read.all():
if mysql.project_id == int(project) and mysql.environment == int(environment):
_jsondata[mysql.id] = mysql.database
return JsonResponse(_jsondata)
实现思路与上边类似,只是多了一步根据项目和环境再进行判断
需要根据group去反查都有哪些DB实例包含了该组,这里用到了M2M的related_name属性:group.read.all()
更多关于Django ORM查询的内容可以看这篇文章Django model select的各种用法详解有详细的总结
除了上边的两个场景之外我们还需要在执行具体的操作之前去判断是否有权限,例如执行审核操作前判断用户是否对此DB有写权限
有很多地方都需要做这个判断,所以把这个权限判断单独写个方法来处理,代码如下:
def check_permission(perm, mysql, user):
# 如果用户是超级管理员则有权限
if user.is_superuser:
return True
# 取出用户所属的所有组
_user_groups = user.groups.all()
# 取出Mysql对应权限的所有组
if perm == 'read':
_mysql_groups = mysql.read_groups.all()
if perm == 'write':
_mysql_groups = mysql.write_groups.all()
# 用户组和DB权限组取交集,有则表示有权限,否则没有权限
group_list = list(set(_user_groups).intersection(set(_mysql_groups)))
return False if len(group_list) == 0 else True
实现思路是:根据传入的第三个用户参数,来获取到用户所有的组,然后根据传入的第一个参数类型读取或写入和第二个参数DB实例来获取到有权限的所有组,然后对两个组取交集,交集不为空则表示有权限,为空则没有
M2M的.all()
取出来的结果是个list,两个list取交集的方法为:list(set(list-A).intersection(set(list-B)))
view中使用就很简单了,如下:
def query(request):
if request.method == 'POST':
postdata = request.body.decode('utf-8')
_host = get_object_or_404(Mysql, id=int(postdata.get('database')))
# 检查用户是否有DB的查询权限
if check_permission('read', _host, request.user) == False:
return JsonResponse({'state': 0, 'message': '当前用户没有查询此DB的权限'})