Coverage for sm / views.py: 51%

157 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 13:46 +0000

1from django.views.generic import TemplateView 

2from django.contrib.auth.mixins import LoginRequiredMixin 

3from django.contrib import messages 

4from server.models import Model as Server 

5from cluster.models import Model as Cluster 

6from vendor.models import Model as Vendor 

7from operatingsystem.models import Model as OS 

8from django.db.models import Count, Q 

9from django.core.exceptions import ObjectDoesNotExist 

10from django.apps import apps 

11from django.http import Http404, HttpResponseRedirect, JsonResponse 

12from django.conf import settings 

13 

14from django.db.models import ProtectedError 

15from django.utils.translation import gettext as _ 

16from django.shortcuts import redirect 

17from django.views import View 

18from typing import Any, List 

19from .utils_starterpack import import_starter_pack 

20 

21 

22class SafeDeleteMixin: 

23 """ 

24 Mixin to catch ProtectedError during deletion and offer 

25 reassignment or bulk deletion. 

26 """ 

27 

28 def get_context_data(self, **kwargs: Any) -> Any: 

29 context = super().get_context_data(**kwargs) # type: ignore 

30 if hasattr(self, "protected_error") and self.protected_error: # type: ignore 

31 context["protected_error"] = True 

32 # Exclude our object from reassign list 

33 context["all_objects"] = self.model.objects.exclude( 

34 pk=self.object.pk # type: ignore 

35 ) 

36 

37 # Re-collect protected objects properly 

38 try: 

39 self.object.delete() # type: ignore 

40 except ProtectedError as e: 

41 context["protected_objects"] = e.protected_objects 

42 context["protected_count"] = len(e.protected_objects) 

43 return context 

44 

45 def form_valid(self, form: Any) -> Any: 

46 success_url = self.get_success_url() # type: ignore 

47 try: 

48 # Try normal deletion first 

49 obj_name = str(self.object) # type: ignore 

50 self.object.delete() # type: ignore 

51 if hasattr(self, "success_message") and self.success_message: # type: ignore # noqa: E501 

52 messages.success( 

53 # type: ignore 

54 self.request, 

55 self.success_message % self.object.__dict__, 

56 ) 

57 else: 

58 messages.success(self.request, _("Successfully deleted %s") % obj_name) 

59 return HttpResponseRedirect(success_url) 

60 except ProtectedError as e: 

61 action = self.request.POST.get("protected_action") 

62 if action == "reassign": 

63 new_obj_id = self.request.POST.get("new_target") 

64 if new_obj_id: 

65 new_obj = self.model.objects.get(pk=new_obj_id) # type: ignore 

66 # This part is tricky as we don't know the field name on 

67 # the remote side without inspecting the protected objects. 

68 for protected in e.protected_objects: 

69 # Find the FK field that points to our object 

70 for field in protected._meta.fields: 

71 # type: ignore 

72 if field.is_relation and field.related_model == self.model: 

73 setattr(protected, field.name, new_obj) 

74 protected.save() 

75 

76 self.object.delete() # type: ignore 

77 messages.success( 

78 self.request, 

79 _("Successfully reassigned dependencies and deleted %s") 

80 % self.object, # type: ignore 

81 ) 

82 return HttpResponseRedirect(success_url) 

83 

84 elif action == "delete_all": 

85 for protected in e.protected_objects: 

86 protected.delete() 

87 self.object.delete() # type: ignore 

88 messages.success( 

89 self.request, 

90 _("Successfully deleted %s and all dependent objects") 

91 % self.object, # type: ignore 

92 ) 

93 return HttpResponseRedirect(success_url) 

94 

95 self.protected_error = True # type: ignore 

96 return self.render_to_response( 

97 self.get_context_data(object=self.object) # type: ignore 

98 ) 

99 

100 

101class DashboardView(LoginRequiredMixin, TemplateView): 

102 template_name = "dashboard.html" 

103 

104 def get_queryset_filtered(self, model: Any) -> Any: 

105 if self.request.user.is_superuser: 

106 return model.objects.all() 

107 user_groups = self.request.user.groups.all() 

108 if hasattr(model, "group"): 

109 return model.objects.filter( 

110 Q(group__in=user_groups) | Q(group__isnull=True) 

111 ) 

112 return model.objects.all() 

113 

114 def get_context_data(self, **kwargs: Any) -> Any: 

115 context = super().get_context_data(**kwargs) 

116 

117 # Basic Stats (Filtered) 

118 context["server_count"] = self.get_queryset_filtered(Server).count() 

119 context["cluster_count"] = self.get_queryset_filtered(Cluster).count() 

120 context["vendor_count"] = Vendor.objects.count() 

121 context["os_count"] = OS.objects.count() 

122 

123 # Data for Charts (Filtered) 

124 # OS Distribution 

125 os_dist = ( 

126 self.get_queryset_filtered(Server) 

127 .values("operatingsystem__vendor__name", "operatingsystem__version") 

128 .annotate(count=Count("id")) 

129 .order_by("-count")[:5] 

130 ) 

131 

132 context["os_labels"] = [ 

133 f"{item['operatingsystem__vendor__name']} " 

134 f"{item['operatingsystem__version']}" 

135 for item in os_dist 

136 ] 

137 context["os_data"] = [item["count"] for item in os_dist] 

138 

139 # Status Distribution 

140 status_dist = ( 

141 self.get_queryset_filtered(Server) 

142 .values("status__name") 

143 .annotate(count=Count("id")) 

144 .order_by("-count") 

145 ) 

146 

147 context["status_labels"] = [item["status__name"] for item in status_dist] 

148 context["status_data"] = [item["count"] for item in status_dist] 

149 

150 # Recent Activity (Filtered) 

151 context["recent_servers"] = ( 

152 self.get_queryset_filtered(Server).all().order_by("-id")[:5] 

153 ) 

154 

155 return context 

156 

157 

158class SearchView(LoginRequiredMixin, TemplateView): 

159 template_name = "search.html" 

160 

161 def get_queryset_filtered(self, model: Any) -> Any: 

162 if self.request.user.is_superuser: 

163 return model.objects.all() 

164 user_groups = self.request.user.groups.all() 

165 if hasattr(model, "group"): 

166 return model.objects.filter( 

167 Q(group__in=user_groups) | Q(group__isnull=True) 

168 ) 

169 return model.objects.all() 

170 

171 def get_template_names(self) -> List[str]: 

172 if self.request.GET.get("ajax"): 

173 return ["search_results_ajax.html"] 

174 return [self.template_name] 

175 

176 def get_context_data(self, **kwargs: Any) -> Any: 

177 context = super().get_context_data(**kwargs) 

178 query = self.request.GET.get("q", "").lower() 

179 context["query"] = query 

180 

181 # Navigation Quick Jumps 

182 nav_targets = [ 

183 {"name": _("Dashboard"), "url": "/", "icon": "fa-gauge-high"}, 

184 { 

185 "name": _("Group Management"), 

186 "url": "/group/members/", 

187 "icon": "fa-users", 

188 }, 

189 {"name": _("Servers"), "url": "/server/", "icon": "fa-server"}, 

190 {"name": _("Server Models"), "url": "/servermodel/", "icon": "fa-cubes"}, 

191 {"name": _("Vendors"), "url": "/vendor/", "icon": "fa-industry"}, 

192 {"name": _("Clusters"), "url": "/cluster/", "icon": "fa-th-large"}, 

193 { 

194 "name": _("Operating Systems"), 

195 "url": "/operatingsystem/", 

196 "icon": "fa-laptop", 

197 }, 

198 {"name": _("Statuses"), "url": "/status/", "icon": "fa-tag"}, 

199 {"name": _("Locations"), "url": "/location/", "icon": "fa-map-marker-alt"}, 

200 {"name": _("Domains"), "url": "/domain/", "icon": "fa-globe"}, 

201 {"name": _("Patch Times"), "url": "/patchtime/", "icon": "fa-calendar"}, 

202 { 

203 "name": _("Cluster Software"), 

204 "url": "/clustersoftware/", 

205 "icon": "fa-shield-halved", 

206 }, 

207 { 

208 "name": _("Cluster Packages"), 

209 "url": "/clusterpackage/", 

210 "icon": "fa-archive", 

211 }, 

212 { 

213 "name": _("API Documentation"), 

214 "url": "/api/schema/swagger-ui/", 

215 "icon": "fa-book", 

216 }, 

217 ] 

218 

219 if len(query) >= 2: 

220 # Filter navigation targets 

221 context["nav_results"] = [ 

222 item for item in nav_targets if query in (item["name"].lower()) 

223 ] 

224 

225 context["servers"] = self.get_queryset_filtered(Server).filter( 

226 hostname__icontains=query 

227 )[:10] 

228 context["vendors"] = Vendor.objects.filter(name__icontains=query)[:10] 

229 context["clusters"] = self.get_queryset_filtered(Cluster).filter( 

230 name__icontains=query 

231 )[:10] 

232 

233 # Simple check if anything was found 

234 context["has_results"] = any( 

235 [ 

236 context["nav_results"], 

237 context["servers"].exists(), 

238 context["vendors"].exists(), 

239 context["clusters"].exists(), 

240 ] 

241 ) 

242 else: 

243 context["has_results"] = False 

244 context["query_too_short"] = True 

245 

246 return context 

247 

248 

249class HistoryDiffView(LoginRequiredMixin, TemplateView): 

250 template_name = "history_diff.html" 

251 

252 def get_context_data(self, **kwargs: Any) -> Any: 

253 context = super().get_context_data(**kwargs) 

254 app_label = kwargs.get("app_label") 

255 history_id = kwargs.get("history_id") 

256 

257 try: 

258 model = apps.get_model(app_label, "Model") 

259 record = model.history.get(history_id=history_id) # type: ignore 

260 except (LookupError, ObjectDoesNotExist): 

261 raise Http404("History record not found") 

262 

263 context["record"] = record 

264 context["instance"] = record.instance 

265 context["app_label"] = app_label 

266 

267 if record.prev_record: 

268 context["diff"] = record.diff_against(record.prev_record) 

269 else: 

270 context["diff"] = None 

271 

272 return context 

273 

274 

275class ImportStarterPackView(LoginRequiredMixin, View): 

276 def post(self, request: Any, *args: Any, **kwargs: Any) -> Any: 

277 user_groups = request.user.groups.all() 

278 if not user_groups.exists(): 

279 messages.error(request, _("You are not assigned to any group.")) 

280 return redirect("dashboard") 

281 

282 group = user_groups.first() 

283 results = import_starter_pack(group) 

284 

285 messages.success( 

286 request, 

287 _("Imported %d vendors and %d operating systems into group %s.") 

288 % (results["vendors"], results["os"], group.name), 

289 ) 

290 return redirect("vendor:index") 

291 

292 

293class TermsView(TemplateView): 

294 template_name = "legal/terms.html" 

295 

296 

297class PrivacyView(TemplateView): 

298 template_name = "legal/privacy.html" 

299 

300 

301class ImpressumView(TemplateView): 

302 template_name = "legal/impressum.html" 

303 

304 

305class HealthView(View): 

306 def get(self, request: Any, *args: Any, **kwargs: Any) -> JsonResponse: 

307 health = { 

308 "status": "healthy", 

309 "version": getattr(settings, "APP_VERSION", "unknown"), 

310 "last_modification": getattr(settings, "APP_MODIFICATION_DATE", "unknown"), 

311 "checks": {}, 

312 } 

313 

314 # Check database connection 

315 try: 

316 from django.db import connection 

317 

318 connection.cursor() 

319 health["checks"]["database"] = "ok" 

320 except Exception as e: 

321 health["status"] = "unhealthy" 

322 health["checks"]["database"] = f"error: {str(e)}" 

323 

324 status_code = 200 if health["status"] == "healthy" else 503 

325 return JsonResponse(health, status=status_code)