Coverage for sm / views_group.py: 29%

268 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-24 12:43 +0000

1from django.views.generic import ListView, FormView, TemplateView 

2from django.contrib.auth.mixins import LoginRequiredMixin, UserPassesTestMixin 

3from django.contrib.auth.models import User, Group 

4from django.db.models import Q 

5from .models import GroupProfile, Invitation 

6from django.urls import reverse_lazy, reverse 

7from django.shortcuts import get_object_or_404, redirect 

8from django.contrib import messages 

9from django.utils.translation import gettext as _ 

10from django import forms 

11from django.views import View 

12from django.core.mail import send_mail 

13from django.template.loader import render_to_string 

14from typing import Any 

15from .utils_permissions import get_group_permissions_for_model 

16from django.utils import timezone 

17from django.conf import settings 

18from django.http import JsonResponse 

19 

20 

21class GroupOwnerRequiredMixin(UserPassesTestMixin): 

22 def test_func(self) -> bool: 

23 # Superusers can manage all groups 

24 if self.request.user.is_superuser: 

25 return True 

26 # Check if user is owner of at least one group 

27 return GroupProfile.objects.filter(owner=self.request.user).exists() 

28 

29 

30class GroupMemberListView(LoginRequiredMixin, GroupOwnerRequiredMixin, ListView): 

31 template_name = "group/member_list.html" 

32 context_object_name = "groups" 

33 

34 def get_queryset(self) -> Any: 

35 if self.request.user.is_superuser: 

36 return Group.objects.all().prefetch_related("user_set", "profile") 

37 return ( 

38 Group.objects.filter( 

39 Q(profile__owner=self.request.user) | Q(user=self.request.user) 

40 ) 

41 .prefetch_related("user_set", "profile") 

42 .distinct() 

43 ) 

44 

45 

46class AddGroupMemberForm(forms.Form): 

47 username = forms.CharField(label=_("Username or Email")) 

48 

49 def clean_username(self) -> Any: 

50 username = self.cleaned_data["username"] 

51 try: 

52 if "@" in username: 

53 return User.objects.get(email=username) 

54 return User.objects.get(username=username) 

55 except User.DoesNotExist: 

56 raise forms.ValidationError(_("User does not exist.")) 

57 

58 

59class CreateGroupForm(forms.Form): 

60 name = forms.CharField(label=_("Group Name"), max_length=150) 

61 max_users = forms.IntegerField( 

62 label=_("Max Users"), 

63 initial=10, 

64 min_value=2, 

65 max_value=100, 

66 help_text=_("Maximum number of users in this group"), 

67 ) 

68 

69 

70class AddGroupMemberView(LoginRequiredMixin, GroupOwnerRequiredMixin, FormView): 

71 form_class = AddGroupMemberForm 

72 template_name = "group/add_member.html" 

73 

74 def get_success_url(self) -> str: 

75 return reverse_lazy("group_member_list") 

76 

77 def form_valid(self, form: Any) -> Any: 

78 group_id = self.kwargs.get("group_id") 

79 # Ensure the user is the owner OR a superuser 

80 if self.request.user.is_superuser: 

81 group = get_object_or_404(Group, pk=group_id) 

82 else: 

83 group = get_object_or_404( 

84 Group, pk=group_id, profile__owner=self.request.user 

85 ) 

86 

87 user_to_add = form.cleaned_data["username"] 

88 

89 if group.user_set.count() >= group.profile.max_users: 

90 messages.error( 

91 self.request, 

92 _("User quota exceeded for this group (%d users).") 

93 % group.profile.max_users, 

94 ) 

95 return self.form_invalid(form) 

96 

97 group.user_set.add(user_to_add) 

98 messages.success( 

99 self.request, 

100 _("User %s added to group %s.") % (user_to_add.username, group.name), 

101 ) 

102 return super().form_valid(form) 

103 

104 

105class RemoveGroupMemberView(LoginRequiredMixin, GroupOwnerRequiredMixin, View): 

106 def post(self, request: Any, *args: Any, **kwargs: Any) -> Any: 

107 group_id = self.kwargs.get("group_id") 

108 user_id = self.kwargs.get("user_id") 

109 

110 if self.request.user.is_superuser: 

111 group = get_object_or_404(Group, pk=group_id) 

112 else: 

113 group = get_object_or_404( 

114 Group, pk=group_id, profile__owner=self.request.user 

115 ) 

116 

117 user_to_remove = get_object_or_404(User, pk=user_id) 

118 

119 if user_to_remove == self.request.user and not self.request.user.is_superuser: 

120 messages.error( 

121 request, _("You cannot remove yourself from your own group.") 

122 ) 

123 else: 

124 group.user_set.remove(user_to_remove) 

125 messages.success( 

126 request, 

127 _("User %s removed from group %s.") 

128 % (user_to_remove.username, group.name), 

129 ) 

130 

131 return redirect("group_member_list") 

132 

133 

134class GroupPermissionForm(forms.Form): 

135 def __init__(self, *args: Any, **kwargs: Any) -> None: 

136 self.group = kwargs.pop("group") 

137 super().__init__(*args, **kwargs) 

138 

139 self.models_to_manage = [ 

140 ("server", _("Servers")), 

141 ("cluster", _("Clusters")), 

142 ("domain", _("Domains")), 

143 ("vendor", _("Vendors")), 

144 ("operatingsystem", _("Operating Systems")), 

145 ] 

146 

147 current_perms = self.group.permissions.all().values_list("codename", flat=True) 

148 

149 for app_label, label in self.models_to_manage: 

150 # Find the actual change permission codename which might be 'change_model' 

151 # because we named our model 'Model' 

152 perms = get_group_permissions_for_model(app_label) 

153 change_perm = next( 

154 (p for p in perms if p.codename.startswith("change_")), None 

155 ) 

156 

157 initial_val = False 

158 if change_perm: 

159 initial_val = change_perm.codename in current_perms 

160 

161 self.fields[f"edit_{app_label}"] = forms.BooleanField( 

162 label=_("Can Edit %s") % label, 

163 required=False, 

164 initial=initial_val, 

165 ) 

166 

167 def save(self) -> None: 

168 for app_label_tuple in self.models_to_manage: 

169 app_label = app_label_tuple[0] 

170 perms = get_group_permissions_for_model(app_label) 

171 # We manage add, change, delete as "Edit" 

172 edit_perms = [ 

173 p 

174 for p in perms 

175 if p.codename.startswith(("add_", "change_", "delete_")) 

176 ] 

177 

178 if self.cleaned_data.get(f"edit_{app_label}"): 

179 self.group.permissions.add(*edit_perms) 

180 else: 

181 self.group.permissions.remove(*edit_perms) 

182 

183 

184class GroupPermissionUpdateView(LoginRequiredMixin, GroupOwnerRequiredMixin, FormView): 

185 template_name = "group/permission_edit.html" 

186 form_class = GroupPermissionForm 

187 

188 def get_form_kwargs(self) -> Any: 

189 kwargs = super().get_form_kwargs() 

190 group_id = self.kwargs.get("group_id") 

191 if self.request.user.is_superuser: 

192 kwargs["group"] = get_object_or_404(Group, pk=group_id) 

193 else: 

194 kwargs["group"] = get_object_or_404( 

195 Group, pk=group_id, profile__owner=self.request.user 

196 ) 

197 return kwargs 

198 

199 def get_success_url(self) -> str: 

200 return reverse_lazy("group_member_list") 

201 

202 def form_valid(self, form: Any) -> Any: 

203 form.save() 

204 messages.success(self.request, _("Group permissions updated successfully.")) 

205 return super().form_valid(form) 

206 

207 

208class UserPermissionForm(forms.Form): 

209 def __init__(self, *args: Any, **kwargs: Any) -> None: 

210 self.user = kwargs.pop("user") 

211 self.group = kwargs.pop("group") 

212 super().__init__(*args, **kwargs) 

213 

214 self.models_to_manage = [ 

215 ("server", _("Servers")), 

216 ("cluster", _("Clusters")), 

217 ("domain", _("Domains")), 

218 ("vendor", _("Vendors")), 

219 ("operatingsystem", _("Operating Systems")), 

220 ] 

221 

222 group_perms = self.group.permissions.all().values_list("codename", flat=True) 

223 user_perms = self.user.user_permissions.all().values_list("codename", flat=True) 

224 

225 for app_label, label in self.models_to_manage: 

226 perms = get_group_permissions_for_model(app_label) 

227 change_perm = next( 

228 (p for p in perms if p.codename.startswith("change_")), None 

229 ) 

230 

231 initial_val = False 

232 if change_perm: 

233 initial_val = change_perm.codename in user_perms 

234 

235 inherited = change_perm.codename in group_perms if change_perm else False 

236 

237 self.fields[f"edit_{app_label}"] = forms.BooleanField( 

238 label=_("Can Edit %s") % label, 

239 required=False, 

240 initial=initial_val, 

241 ) 

242 if inherited: 

243 self.fields[f"edit_{app_label}"].help_text = _("(Inherited from group)") 

244 

245 def save(self) -> None: 

246 for app_label_tuple in self.models_to_manage: 

247 app_label = app_label_tuple[0] 

248 perms = get_group_permissions_for_model(app_label) 

249 edit_perms = [ 

250 p 

251 for p in perms 

252 if p.codename.startswith(("add_", "change_", "delete_")) 

253 ] 

254 

255 if self.cleaned_data.get(f"edit_{app_label}"): 

256 self.user.user_permissions.add(*edit_perms) 

257 else: 

258 self.user.user_permissions.remove(*edit_perms) 

259 

260 

261class UserPermissionUpdateView(LoginRequiredMixin, GroupOwnerRequiredMixin, FormView): 

262 template_name = "group/user_permission_edit.html" 

263 form_class = UserPermissionForm 

264 

265 def get_form_kwargs(self) -> Any: 

266 kwargs = super().get_form_kwargs() 

267 group_id = self.kwargs.get("group_id") 

268 user_id = self.kwargs.get("user_id") 

269 if self.request.user.is_superuser: 

270 kwargs["group"] = get_object_or_404(Group, pk=group_id) 

271 else: 

272 kwargs["group"] = get_object_or_404( 

273 Group, pk=group_id, profile__owner=self.request.user 

274 ) 

275 kwargs["user"] = get_object_or_404(User, pk=user_id) 

276 return kwargs 

277 

278 def get_success_url(self) -> str: 

279 return reverse_lazy("group_member_list") 

280 

281 def form_valid(self, form: Any) -> Any: 

282 form.save() 

283 messages.success(self.request, _("User permissions updated successfully.")) 

284 return super().form_valid(form) 

285 

286 

287class InviteGroupMemberForm(forms.Form): 

288 email = forms.EmailField(label=_("Email Address")) 

289 

290 def clean_email(self) -> Any: 

291 email = self.cleaned_data["email"] 

292 if User.objects.filter(email__iexact=email).exists(): 

293 raise forms.ValidationError(_("A user with this email already exists.")) 

294 return email.lower() 

295 

296 

297class InviteGroupMemberView(LoginRequiredMixin, GroupOwnerRequiredMixin, FormView): 

298 form_class = InviteGroupMemberForm 

299 template_name = "group/invite_member.html" 

300 

301 def get_success_url(self) -> str: 

302 return reverse_lazy("group_member_list") 

303 

304 def get_group(self) -> Group: 

305 group_id = self.kwargs.get("group_id") 

306 if self.request.user.is_superuser: 

307 return get_object_or_404(Group, pk=group_id) 

308 return get_object_or_404(Group, pk=group_id, profile__owner=self.request.user) 

309 

310 def form_valid(self, form: Any) -> Any: 

311 group = self.get_group() 

312 email = form.cleaned_data["email"] 

313 

314 existing_invitation = Invitation.objects.filter( 

315 email__iexact=email, group=group 

316 ).first() 

317 if existing_invitation and not existing_invitation.is_expired(): 

318 messages.error( 

319 self.request, 

320 _("An invitation has already been sent to this email address."), 

321 ) 

322 return self.form_invalid(form) 

323 

324 invitation = Invitation.objects.create( 

325 email=email, 

326 group=group, 

327 created_by=self.request.user, 

328 ) 

329 

330 self.send_invitation_email(invitation) 

331 

332 messages.success( 

333 self.request, 

334 _("Invitation sent to %(email)s.") % {"email": email}, 

335 ) 

336 return super().form_valid(form) 

337 

338 def send_invitation_email(self, invitation: Invitation) -> None: 

339 invite_url = self.request.build_absolute_uri( 

340 reverse("accept_invitation", kwargs={"token": invitation.token}) 

341 ) 

342 subject = _("Invitation to join %(group_name)s on ServerManager") % { 

343 "group_name": invitation.group.name 

344 } 

345 message = render_to_string( 

346 "group/email/invite.txt", 

347 { 

348 "invitation": invitation, 

349 "invite_url": invite_url, 

350 "owner_name": ( 

351 invitation.created_by.username 

352 if invitation.created_by 

353 else "An administrator" 

354 ), 

355 }, 

356 ) 

357 send_mail( 

358 subject, 

359 message, 

360 getattr(settings, "DEFAULT_FROM_EMAIL", "noreply@servermanager"), 

361 [invitation.email], 

362 fail_silently=False, 

363 ) 

364 

365 

366class AcceptInvitationView(TemplateView): 

367 template_name = "group/accept_invitation.html" 

368 

369 def get_context_data(self, **kwargs: Any) -> Any: 

370 context = super().get_context_data(**kwargs) 

371 token = self.kwargs.get("token") 

372 invitation = get_object_or_404(Invitation, token=token) 

373 

374 if invitation.is_expired(): 

375 context["error"] = _("This invitation has expired.") 

376 return context 

377 

378 if invitation.accepted_at: 

379 context["error"] = _("This invitation has already been used.") 

380 return context 

381 

382 context["invitation"] = invitation 

383 return context 

384 

385 def post(self, request: Any, *args: Any, **kwargs: Any) -> Any: 

386 token = self.kwargs.get("token") 

387 invitation = get_object_or_404(Invitation, token=token) 

388 

389 if invitation.is_expired() or invitation.accepted_at: 

390 messages.error(request, _("This invitation is no longer valid.")) 

391 return redirect("account_login") 

392 

393 username = request.POST.get("username") 

394 email = request.POST.get("email") 

395 password = request.POST.get("password") 

396 password_confirm = request.POST.get("password_confirm") 

397 

398 if not username or not email or not password: 

399 messages.error(request, _("All fields are required.")) 

400 return self.get(request, *args, **kwargs) 

401 

402 if password != password_confirm: 

403 messages.error(request, _("Passwords do not match.")) 

404 return self.get(request, *args, **kwargs) 

405 

406 if User.objects.filter(username=username).exists(): 

407 messages.error(request, _("Username already exists.")) 

408 return self.get(request, *args, **kwargs) 

409 

410 if User.objects.filter(email__iexact=email).exists(): 

411 messages.error(request, _("Email already registered.")) 

412 return self.get(request, *args, **kwargs) 

413 

414 user = User.objects.create_user( 

415 username=username, 

416 email=email, 

417 password=password, 

418 ) 

419 

420 invitation.group.user_set.add(user) 

421 invitation.accepted_at = timezone.now() 

422 invitation.save() 

423 

424 messages.success( 

425 request, 

426 _("You have joined %(group_name)s!") 

427 % {"group_name": invitation.group.name}, 

428 ) 

429 

430 from django.contrib.auth import login 

431 

432 login(request, user) 

433 

434 return redirect("dashboard") 

435 

436 

437class GroupFilterView(View): 

438 def post(self, request: Any, *args: Any, **kwargs: Any) -> Any: 

439 if not request.user.is_authenticated: 

440 return JsonResponse({"error": "Unauthorized"}, status=401) 

441 

442 groups_param = request.POST.get("groups", "") 

443 if groups_param: 

444 selected_groups = [g for g in groups_param.split(",") if g] 

445 request.session["selected_groups"] = selected_groups 

446 else: 

447 request.session["selected_groups"] = [] 

448 

449 request.session.modified = True 

450 

451 return JsonResponse({"status": "success"}) 

452 

453 

454class GroupCreateView(LoginRequiredMixin, FormView): 

455 form_class = CreateGroupForm 

456 template_name = "group/member_list.html" 

457 success_url = reverse_lazy("group_member_list") 

458 

459 def form_valid(self, form: Any) -> Any: 

460 name = form.cleaned_data["name"] 

461 max_users = form.cleaned_data.get("max_users", 10) 

462 

463 max_groups = 5 

464 owned_groups_count = self.request.user.owned_groups.count() 

465 if owned_groups_count >= max_groups: 

466 messages.error( 

467 self.request, 

468 _("You can only create up to %d groups.") % max_groups, 

469 ) 

470 return redirect("group_member_list") 

471 

472 existing = Group.objects.filter(name=name).first() 

473 if existing: 

474 messages.error(self.request, _("A group with this name already exists.")) 

475 return redirect("group_member_list") 

476 

477 group = Group.objects.create(name=name) 

478 GroupProfile.objects.create( 

479 group=group, 

480 owner=self.request.user, 

481 max_users=max_users, 

482 ) 

483 

484 self.request.user.groups.add(group) 

485 

486 from .utils_permissions import sync_group_permissions 

487 

488 sync_group_permissions(group) 

489 

490 messages.success(self.request, _("Group '%s' created successfully.") % name) 

491 return super().form_valid(form) 

492 

493 def form_invalid(self, form: Any) -> Any: 

494 messages.error(self.request, _("Failed to create group.")) 

495 return redirect("group_member_list")