Coverage for sm / views.py: 51%
157 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 13:46 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 13:46 +0000
1from django.views.generic import TemplateView
2from django.contrib.auth.mixins import LoginRequiredMixin
3from django.contrib import messages
4from server.models import Model as Server
5from cluster.models import Model as Cluster
6from vendor.models import Model as Vendor
7from operatingsystem.models import Model as OS
8from django.db.models import Count, Q
9from django.core.exceptions import ObjectDoesNotExist
10from django.apps import apps
11from django.http import Http404, HttpResponseRedirect, JsonResponse
12from django.conf import settings
14from django.db.models import ProtectedError
15from django.utils.translation import gettext as _
16from django.shortcuts import redirect
17from django.views import View
18from typing import Any, List
19from .utils_starterpack import import_starter_pack
22class SafeDeleteMixin:
23 """
24 Mixin to catch ProtectedError during deletion and offer
25 reassignment or bulk deletion.
26 """
28 def get_context_data(self, **kwargs: Any) -> Any:
29 context = super().get_context_data(**kwargs) # type: ignore
30 if hasattr(self, "protected_error") and self.protected_error: # type: ignore
31 context["protected_error"] = True
32 # Exclude our object from reassign list
33 context["all_objects"] = self.model.objects.exclude(
34 pk=self.object.pk # type: ignore
35 )
37 # Re-collect protected objects properly
38 try:
39 self.object.delete() # type: ignore
40 except ProtectedError as e:
41 context["protected_objects"] = e.protected_objects
42 context["protected_count"] = len(e.protected_objects)
43 return context
45 def form_valid(self, form: Any) -> Any:
46 success_url = self.get_success_url() # type: ignore
47 try:
48 # Try normal deletion first
49 obj_name = str(self.object) # type: ignore
50 self.object.delete() # type: ignore
51 if hasattr(self, "success_message") and self.success_message: # type: ignore # noqa: E501
52 messages.success(
53 # type: ignore
54 self.request,
55 self.success_message % self.object.__dict__,
56 )
57 else:
58 messages.success(self.request, _("Successfully deleted %s") % obj_name)
59 return HttpResponseRedirect(success_url)
60 except ProtectedError as e:
61 action = self.request.POST.get("protected_action")
62 if action == "reassign":
63 new_obj_id = self.request.POST.get("new_target")
64 if new_obj_id:
65 new_obj = self.model.objects.get(pk=new_obj_id) # type: ignore
66 # This part is tricky as we don't know the field name on
67 # the remote side without inspecting the protected objects.
68 for protected in e.protected_objects:
69 # Find the FK field that points to our object
70 for field in protected._meta.fields:
71 # type: ignore
72 if field.is_relation and field.related_model == self.model:
73 setattr(protected, field.name, new_obj)
74 protected.save()
76 self.object.delete() # type: ignore
77 messages.success(
78 self.request,
79 _("Successfully reassigned dependencies and deleted %s")
80 % self.object, # type: ignore
81 )
82 return HttpResponseRedirect(success_url)
84 elif action == "delete_all":
85 for protected in e.protected_objects:
86 protected.delete()
87 self.object.delete() # type: ignore
88 messages.success(
89 self.request,
90 _("Successfully deleted %s and all dependent objects")
91 % self.object, # type: ignore
92 )
93 return HttpResponseRedirect(success_url)
95 self.protected_error = True # type: ignore
96 return self.render_to_response(
97 self.get_context_data(object=self.object) # type: ignore
98 )
101class DashboardView(LoginRequiredMixin, TemplateView):
102 template_name = "dashboard.html"
104 def get_queryset_filtered(self, model: Any) -> Any:
105 if self.request.user.is_superuser:
106 return model.objects.all()
107 user_groups = self.request.user.groups.all()
108 if hasattr(model, "group"):
109 return model.objects.filter(
110 Q(group__in=user_groups) | Q(group__isnull=True)
111 )
112 return model.objects.all()
114 def get_context_data(self, **kwargs: Any) -> Any:
115 context = super().get_context_data(**kwargs)
117 # Basic Stats (Filtered)
118 context["server_count"] = self.get_queryset_filtered(Server).count()
119 context["cluster_count"] = self.get_queryset_filtered(Cluster).count()
120 context["vendor_count"] = Vendor.objects.count()
121 context["os_count"] = OS.objects.count()
123 # Data for Charts (Filtered)
124 # OS Distribution
125 os_dist = (
126 self.get_queryset_filtered(Server)
127 .values("operatingsystem__vendor__name", "operatingsystem__version")
128 .annotate(count=Count("id"))
129 .order_by("-count")[:5]
130 )
132 context["os_labels"] = [
133 f"{item['operatingsystem__vendor__name']} "
134 f"{item['operatingsystem__version']}"
135 for item in os_dist
136 ]
137 context["os_data"] = [item["count"] for item in os_dist]
139 # Status Distribution
140 status_dist = (
141 self.get_queryset_filtered(Server)
142 .values("status__name")
143 .annotate(count=Count("id"))
144 .order_by("-count")
145 )
147 context["status_labels"] = [item["status__name"] for item in status_dist]
148 context["status_data"] = [item["count"] for item in status_dist]
150 # Recent Activity (Filtered)
151 context["recent_servers"] = (
152 self.get_queryset_filtered(Server).all().order_by("-id")[:5]
153 )
155 return context
158class SearchView(LoginRequiredMixin, TemplateView):
159 template_name = "search.html"
161 def get_queryset_filtered(self, model: Any) -> Any:
162 if self.request.user.is_superuser:
163 return model.objects.all()
164 user_groups = self.request.user.groups.all()
165 if hasattr(model, "group"):
166 return model.objects.filter(
167 Q(group__in=user_groups) | Q(group__isnull=True)
168 )
169 return model.objects.all()
171 def get_template_names(self) -> List[str]:
172 if self.request.GET.get("ajax"):
173 return ["search_results_ajax.html"]
174 return [self.template_name]
176 def get_context_data(self, **kwargs: Any) -> Any:
177 context = super().get_context_data(**kwargs)
178 query = self.request.GET.get("q", "").lower()
179 context["query"] = query
181 # Navigation Quick Jumps
182 nav_targets = [
183 {"name": _("Dashboard"), "url": "/", "icon": "fa-gauge-high"},
184 {
185 "name": _("Group Management"),
186 "url": "/group/members/",
187 "icon": "fa-users",
188 },
189 {"name": _("Servers"), "url": "/server/", "icon": "fa-server"},
190 {"name": _("Server Models"), "url": "/servermodel/", "icon": "fa-cubes"},
191 {"name": _("Vendors"), "url": "/vendor/", "icon": "fa-industry"},
192 {"name": _("Clusters"), "url": "/cluster/", "icon": "fa-th-large"},
193 {
194 "name": _("Operating Systems"),
195 "url": "/operatingsystem/",
196 "icon": "fa-laptop",
197 },
198 {"name": _("Statuses"), "url": "/status/", "icon": "fa-tag"},
199 {"name": _("Locations"), "url": "/location/", "icon": "fa-map-marker-alt"},
200 {"name": _("Domains"), "url": "/domain/", "icon": "fa-globe"},
201 {"name": _("Patch Times"), "url": "/patchtime/", "icon": "fa-calendar"},
202 {
203 "name": _("Cluster Software"),
204 "url": "/clustersoftware/",
205 "icon": "fa-shield-halved",
206 },
207 {
208 "name": _("Cluster Packages"),
209 "url": "/clusterpackage/",
210 "icon": "fa-archive",
211 },
212 {
213 "name": _("API Documentation"),
214 "url": "/api/schema/swagger-ui/",
215 "icon": "fa-book",
216 },
217 ]
219 if len(query) >= 2:
220 # Filter navigation targets
221 context["nav_results"] = [
222 item for item in nav_targets if query in (item["name"].lower())
223 ]
225 context["servers"] = self.get_queryset_filtered(Server).filter(
226 hostname__icontains=query
227 )[:10]
228 context["vendors"] = Vendor.objects.filter(name__icontains=query)[:10]
229 context["clusters"] = self.get_queryset_filtered(Cluster).filter(
230 name__icontains=query
231 )[:10]
233 # Simple check if anything was found
234 context["has_results"] = any(
235 [
236 context["nav_results"],
237 context["servers"].exists(),
238 context["vendors"].exists(),
239 context["clusters"].exists(),
240 ]
241 )
242 else:
243 context["has_results"] = False
244 context["query_too_short"] = True
246 return context
249class HistoryDiffView(LoginRequiredMixin, TemplateView):
250 template_name = "history_diff.html"
252 def get_context_data(self, **kwargs: Any) -> Any:
253 context = super().get_context_data(**kwargs)
254 app_label = kwargs.get("app_label")
255 history_id = kwargs.get("history_id")
257 try:
258 model = apps.get_model(app_label, "Model")
259 record = model.history.get(history_id=history_id) # type: ignore
260 except (LookupError, ObjectDoesNotExist):
261 raise Http404("History record not found")
263 context["record"] = record
264 context["instance"] = record.instance
265 context["app_label"] = app_label
267 if record.prev_record:
268 context["diff"] = record.diff_against(record.prev_record)
269 else:
270 context["diff"] = None
272 return context
275class ImportStarterPackView(LoginRequiredMixin, View):
276 def post(self, request: Any, *args: Any, **kwargs: Any) -> Any:
277 user_groups = request.user.groups.all()
278 if not user_groups.exists():
279 messages.error(request, _("You are not assigned to any group."))
280 return redirect("dashboard")
282 group = user_groups.first()
283 results = import_starter_pack(group)
285 messages.success(
286 request,
287 _("Imported %d vendors and %d operating systems into group %s.")
288 % (results["vendors"], results["os"], group.name),
289 )
290 return redirect("vendor:index")
293class TermsView(TemplateView):
294 template_name = "legal/terms.html"
297class PrivacyView(TemplateView):
298 template_name = "legal/privacy.html"
301class ImpressumView(TemplateView):
302 template_name = "legal/impressum.html"
305class HealthView(View):
306 def get(self, request: Any, *args: Any, **kwargs: Any) -> JsonResponse:
307 health = {
308 "status": "healthy",
309 "version": getattr(settings, "APP_VERSION", "unknown"),
310 "last_modification": getattr(settings, "APP_MODIFICATION_DATE", "unknown"),
311 "checks": {},
312 }
314 # Check database connection
315 try:
316 from django.db import connection
318 connection.cursor()
319 health["checks"]["database"] = "ok"
320 except Exception as e:
321 health["status"] = "unhealthy"
322 health["checks"]["database"] = f"error: {str(e)}"
324 status_code = 200 if health["status"] == "healthy" else 503
325 return JsonResponse(health, status=status_code)