Initial commit

This commit is contained in:
Claude Paroz 2024-06-03 16:49:01 +02:00
commit 793bb6a488
182 changed files with 17153 additions and 0 deletions

0
archive/__init__.py Normal file
View file

9
archive/admin.py Normal file
View file

@ -0,0 +1,9 @@
from django.contrib import admin
from .models import Archive
@admin.register(Archive)
class ArchiveAdmin(admin.ModelAdmin):
list_display = ['nom', 'unite', 'date_debut', 'date_fin']
search_fields = ['nom']

31
archive/forms.py Normal file
View file

@ -0,0 +1,31 @@
from django import forms
from aemo.forms import BootstrapMixin
class ArchiveKeyUploadForm(BootstrapMixin, forms.Form):
file = forms.FileField(label='Clé de déchiffrement des archives', required=True)
class ArchiveFilterForm(BootstrapMixin, forms.Form):
search_famille = forms.CharField(
label='Recherche par nom de famille',
max_length=30,
required=False
)
search_intervenant = forms.CharField(
label='Recherche par interv. CRNE',
max_length=30,
required=False
)
def filter(self, archives):
if not self.cleaned_data['search_famille'] and not self.cleaned_data['search_intervenant']:
return archives.none()
if self.cleaned_data['search_famille']:
archives = archives.filter(nom__icontains=self.cleaned_data['search_famille'])
if self.cleaned_data['search_intervenant']:
archives = archives.filter(intervenant__icontains=self.cleaned_data['search_intervenant'])
return archives

View file

@ -0,0 +1,30 @@
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='Archive',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('nom', models.CharField(max_length=40)),
('unite', models.CharField(max_length=10)),
('intervenant', models.CharField(blank=True, max_length=50)),
('ope', models.CharField(blank=True, max_length=50)),
('motif_fin', models.CharField(blank=True, max_length=30)),
('date_debut', models.DateField(null=True)),
('date_fin', models.DateField()),
('key', models.TextField()),
('pdf', models.FileField(null=True, upload_to='archives/')),
],
options={
'ordering': ('nom',),
},
),
]

View file

@ -0,0 +1,16 @@
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('archive', '0001_Add_archive_model'),
]
operations = [
migrations.AlterField(
model_name='archive',
name='intervenant',
field=models.CharField(blank=True, max_length=120),
),
]

View file

19
archive/models.py Normal file
View file

@ -0,0 +1,19 @@
from django.db import models
class Archive(models.Model):
nom = models.CharField(max_length=40)
unite = models.CharField(max_length=10)
intervenant = models.CharField(max_length=120, blank=True)
ope = models.CharField(max_length=50, blank=True)
motif_fin = models.CharField(max_length=30, blank=True)
date_debut = models.DateField(null=True)
date_fin = models.DateField()
key = models.TextField()
pdf = models.FileField(upload_to='archives/', null=True)
class Meta:
ordering = ('nom',)
def __str__(self):
return f"{self.unite}: {self.nom}"

64
archive/pdf.py Normal file
View file

@ -0,0 +1,64 @@
import subprocess
import tempfile
from io import BytesIO
from pathlib import Path
from pypdf import PdfWriter, PdfReader
from django.utils.text import slugify
from aemo.pdf import BasePDF, BilanPdf, EvaluationPdf, MessagePdf, RapportPdf, JournalPdf
class ArchiveBase(BasePDF):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.merger = PdfWriter()
def get_filename(self):
return f"{slugify(self.instance.nom)}-{self.instance.pk}.pdf"
def append_pdf(self, PDFClass, obj):
temp = BytesIO()
pdf = PDFClass(temp, obj)
pdf.produce()
self.merger.append(temp)
def append_other_docs(self, documents):
msg = []
for doc in documents:
doc_path = Path(doc.fichier.path)
if not doc_path.exists():
msg.append(f"Le fichier «{doc.titre}» n'existe pas!")
continue
if doc_path.suffix.lower() == '.pdf':
self.merger.append(PdfReader(str(doc_path)))
elif doc_path.suffix.lower() in ['.doc', '.docx']:
with tempfile.TemporaryDirectory() as tmpdir:
cmd = ['libreoffice', '--headless', '--convert-to', 'pdf', '--outdir', tmpdir, doc_path]
subprocess.run(cmd, capture_output=True)
converted_path = Path(tmpdir) / f'{doc_path.stem}.pdf'
if converted_path.exists():
self.merger.append(PdfReader(str(converted_path)))
else:
msg.append(f"La conversion du fichier «{doc.titre}» a échoué")
elif doc_path.suffix.lower() == '.msg':
self.append_pdf(MessagePdf, doc)
else:
msg.append(f"Le format du fichier «{doc.titre}» ne peut pas être intégré.")
return msg
class ArchivePdf(ArchiveBase):
title = "Archive"
def produce(self):
famille = self.instance
self.append_pdf(EvaluationPdf, famille)
self.append_pdf(JournalPdf, famille)
for bilan in famille.bilans.all():
self.append_pdf(BilanPdf, bilan)
for rapport in famille.rapports.all():
self.append_pdf(RapportPdf, rapport)
msg = self.append_other_docs(famille.documents.all())
self.merger.write(self.doc.filename)
return msg

View file

@ -0,0 +1,18 @@
{% extends 'base.html' %}
{% block content %}
<div class="row justify-content-center">
<div class="col-4 mt-5 p-5 border">
<form action="." method="post" enctype="multipart/form-data">
{% csrf_token %}
{{ form.as_div }}
<div id="actions" class="row border-top mt-4">
<div class="col mt-3 text-end">
<a class="btn btn-sm btn-secondary" href="javascript: history.go(-1)">Annuler</a>
<button class="btn btn-sm btn-success" name="save" type="submit">Envoyer et déchiffrer</button>
</div>
</div>
</form>
</div>
</div>
{% endblock %}

View file

@ -0,0 +1,37 @@
{% extends 'base.html' %}
{% block extra_javascript %}
<script type="text/javascript">
$(document).ready(function() {
$("#id_search_famille, #id_search_intervenant").keyup(debounce((ev) => {
const form = ev.target.form;
$.getJSON({
url: form.action,
data: $(form).serialize()
}).done(function(response) {
$("#archive_table").html(response);
});
}));
});
</script>
{% endblock %}
{% block content %}
<div class="row mb-5 border-bottom">
<div class="col-2 pt-5">
<h4>Archives {{ unite|upper }}</h4>
</div>
<div class="col">
<form method="get" action="{% url 'archive-list' unite %}">
<div class="row justify-content-end mb-3">
<div class="col-auto">{{ form.search_famille.label_tag }} {{ form.search_famille }}</div>
<div class="col-auto">{{form.search_intervenant.label_tag }} {{ form.search_intervenant }}</div>
</div>
</form>
</div>
</div>
<div class="row">
<div class="col-12">
<div id="archive_table">Utilisez la recherche par nom de famille et/ou par intervenant.</div>
</div>
</div>
{% endblock %}

View file

@ -0,0 +1,28 @@
<table class="table table-sm table-hover">
<tr>
<th>Nom</th><th>Réf. {{ unite|upper }}</th><th>Réf. OPE</th><th>Date de début</th>
<th>Date de fin</th><th>Motif de fin</th>
{% if can_download %}
<th width="15rem">Déchiffrement</th>
{% endif %}
</tr>
{% for archive in archives %}
<tr>
<td>{{ archive.nom }}</td>
<td>{{ archive.intervenant }}</td>
<td>{{ archive.ope }}</td>
<td>{{ archive.date_debut|default:'' }}</td>
<td>{{ archive.date_fin }}</td>
<td>{{ archive.motif_fin }}</td>
{% if can_download %}
<td align="center">
<a class="btn btn-sm bg-success" href="{%url 'archive-decrypt' archive.pk %}">Déchiffrer</a>
</td>
{% endif %}
</tr>
{% empty %}
<tr>
<td colspan="7">Aucune famille pour cette recherche.</td>
</tr>
{% endfor %}
</table>

View file

51
archive/tests/crne_rsa Normal file
View file

@ -0,0 +1,51 @@
-----BEGIN RSA PRIVATE KEY-----
MIIJKAIBAAKCAgEArBihBJdaIiYBOaP/AjwvqQQFO1inVEIVrKukPJmbxMjFceBr
Hd1YDMQ2J5K9LcqlZnFJLn1XwGVfGSMosRA8VOKaof/EA9npdQ/ncNpTc8Gugq2z
0UmsYjr2OXloZu3bkEdzhE7Nf5wE/s5qHQ6IFn6NyHwSbg6iUVl1d+C+UZNVXZPa
yhAHxsqIMQb7a/FCqusWV0g8HmP4xSq7Z8gAl7Bpg/eGqnIKVm2i8U0dOuAIcrof
45QAn07e2imX/2GLSURrHnjZcelUUWJflEDdNrsFi/3z1PYPhNOHvwR7SLzwcnXZ
8GElpcGniBR0vU2urCDtC1txNaGABjoWqPHPmdxsqhujoNd8eVxu20Do0fyKrSnF
qGzU/nFye77QJM7HUKSCILRSJvXhjV4Rn7qvU5+Cpc8wJd5T/cvkaXIZ0cXGSU6r
JX5QdLRT05MI+W4gyGzb3MTna9/EMjtpR8no6SYBYscTY/jw9K3hNYou1MjJF1y3
pUSmcB0aXrUqjc1bshzq7SVvkdoDGz2j0bHyOkc94G1yMNJuK6znqHqZzBjE9MPd
E0RdKIajQAqxcf+3S8hS+udvYMfA5gZ7Vi1FiGJBif6PRdSMM5ad7N3tClr7JLTH
8mnYDGyELaJal2lUu66SLOLiQWY0dWGtvjGPyMhWWzSTPp8t2mMRjLx3FIUCAwEA
AQKCAgAK0GYMf07odsHnSNZajMSj7BRAv+031AqU9e4qlAPKmvZz6u8K3+CgthL/
Fpnw+YW+M7UuXi2i8dvGx8Oj/goEfX7Lr4zg1Gh2n4ANbTKxmJnqLUHE5MptG8O4
Jdhjy2OGI+9EK51/J3iAOHaeNSO5DM0aVtg49o4dlYUA+kGLUmTwAz1MKt4KOv98
MTJK/KHnUpbDTPngy6sTXKriYRokbgSbXcpRBecqD4FIhMN8R8f6yZZSZcz04G8M
Khvv2MXOCKYV3UXbV2xMduWG+ofMC3bUQ/92l4WvH4mtUk9FlAB5+MzamDLWQR58
DxOs3OedJQPa4wKbT6GCAqlvhqO4WX/EUKRi+eoVyIHuVmkYoHRQZhOdl7X+ortr
pU2FRRMVFMI/rCHGrPGUlIz0+03+Yudg0oIawDwlLKj5gO/xXy8mGbSYbhQE9TjH
n8pAu7UJOglK0JRMhow+qsB0FQ8l1t1QmDaO6zbjIk4J2liCqmhojqNmxl2Wx7vm
vVGtkEpRwH5cE+uNWZ5h/QTsJogadOS58gF5Wos5Pe8UnWZYtLOe0KbF+iZPeQiN
jOJUYFSktytAIixSLrcp816qctfZs93IocBWHlrsV8iF8rs9CrMZiX/QK+l6CrAu
CNGWO9Wm8NytkISQbBIl7T7oGiaLr0Kl0KkmPwDovnwH2CCx4QKCAQEA05ZTz93C
k91ixDsaioGevdnNT8JBknWu1odcmLzOpzYZCTs9j9du3N3OAImZT9j0tA4NiLbO
2GCR93+/glNJJG1iJJc2x39qPZLwbXHr/Dx5YrH5PP25sjbm2xxAxaztJzXEIJgR
R6+dMJk5qD1qeOMxWxXOStFQGRtyy7umF1jAJUVxgTZt5+hYn5TtU9fTB8o8Qeto
+V2gjyhJISiRQDJYzHoRrKy45cwhBc5njdDrr4TpwwE/WyOtkrQthaaUuCrhpRZE
fOBblO7YWoYdM3IdxjtM45DlZLdIpuycdczJ5zZE3yQ5dSKlU9HQR9fN5pyZ3DV8
eOeMO8P+tzUkdQKCAQEA0Dg+9Y9qeeLtcmS0tD3r1iVTiNOmTT7EerQv4KjYhsJe
Ws2ZnELtzcKT75pkdF7kD06xKyMNFvLeC+/+SagzCziPa05uK3WV7FopQ1Ultj6i
9L9BpOpc6M2MQSObDu7QiCWIEqWwjYXy3Z0Yxm8ME8KyAO4j/YeD3ycPZMAL1yGZ
fTf6NbrOpYvEV+bZIRLkgfc7HAg0IBo5IiQgURVtrA27yZ5VV49Y2DyhDGqH5twF
s7VWkCRMsBeHA7sxeJimVG2NU67eIrc1GBTbzeS9JbP3p3bbpQeCm584siQ9mxxF
NjXDXgwx8gyp5G1UPbPtzEoixxzlgC9CHwH8H4Tt0QKCAQAAqW63rrzmE4I0lO6/
Uip5841122izGZUjbKb4f1ayJTQs2DeYFJdvL25uh/+nxUj2qziVneTFvn+WY5ro
wHPxHjp5XNO6Cgb+DFCeNwYC8vl6Oo5KB40mJo/QTaVSOPlA7yUe6Prc24rFVSVe
Blsn56YG3+mWSFNU0MYqJvsdBZUMSMxTGCV93TcxwJiBc6JgWtyXZDIe3ZEcAYdB
CEx0A/RNJ3CYtq2ZYmsUBpJCWk3ybZsBliZplZH8bH3b9ipu7QtppckvDtCahai9
l7/NomS/cv4JlDFzgDNE+mZ+49YZ2AydGhLn7+TOf1CEeQNW3lSI4M3z3t2Mbk+E
qTDlAoIBAQCqZ83G4/dtBzXyr85f0GlpGaUyzpxEjYD5NuwT/bsvFnVn9OmpQ/Eg
uwSdTAq4Xkxg5rMCLa5xwJPOyzueBmS34zMky8xIDvSCuQsaCt5RNxPgH4JWuGMP
N+F4Ee69mt7Y/XZOZIGIYT5w9jendow4w9cwAbU8sSJQh8QGXVGTX/Eg1KYWQOsL
+sXWdpvugGq4nqAmgeQ+/ZcShORZ16Ko85hjGgyYGz3Hwl6/LZRJcHnOKDNOxhZo
6uhZOmLzYmKFqB7IhM1RNgTiz3dQGspdx9p/mDuL5QiT2gvpZtVwUwOlqPxZxLs/
b/O+eWc/FDkiPu4VbGW6sXJ4tAQlu4FxAoIBAGAVm1D+/+fZpASL/XW6RRsjtEnh
FUgV1LvOKpZjqv7rZAWZOQD+eeJVIM+5/TW0G7dycvQl6oGUMvihYW8FYFyk3MrI
vNOi6+aoB4AQsweHFesonivSZ9e5g3Z0ChxK8lzmCQ0fKg1I6aVdFCuXYxVnNseW
v/fdhI1OC2fwzdihsUjlEwhaSJwnJhLDVPmzOb4jg3KFbFHOHr29hQ2Nw4jHTQ83
J39HvFRI/nZ9LuQ2EXMHMQy47ikV3P7jEXshpxXydLzW19CGn5Qo6bKrTT8iHZ1T
3FsFeSW2Js+KKOAqh9EhkvRJRWHIq/3qgPB7dkQEw9LP/3abYllMrefmVoU=
-----END RSA PRIVATE KEY-----

View file

@ -0,0 +1 @@
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQCsGKEEl1oiJgE5o/8CPC+pBAU7WKdUQhWsq6Q8mZvEyMVx4Gsd3VgMxDYnkr0tyqVmcUkufVfAZV8ZIyixEDxU4pqh/8QD2el1D+dw2lNzwa6CrbPRSaxiOvY5eWhm7duQR3OETs1/nAT+zmodDogWfo3IfBJuDqJRWXV34L5Rk1Vdk9rKEAfGyogxBvtr8UKq6xZXSDweY/jFKrtnyACXsGmD94aqcgpWbaLxTR064Ahyuh/jlACfTt7aKZf/YYtJRGseeNlx6VRRYl+UQN02uwWL/fPU9g+E04e/BHtIvPByddnwYSWlwaeIFHS9Ta6sIO0LW3E1oYAGOhao8c+Z3GyqG6Og13x5XG7bQOjR/IqtKcWobNT+cXJ7vtAkzsdQpIIgtFIm9eGNXhGfuq9Tn4KlzzAl3lP9y+RpchnRxcZJTqslflB0tFPTkwj5biDIbNvcxOdr38QyO2lHyejpJgFixxNj+PD0reE1ii7UyMkXXLelRKZwHRpetSqNzVuyHOrtJW+R2gMbPaPRsfI6Rz3gbXIw0m4rrOeoepnMGMT0w90TRF0ohqNACrFx/7dLyFL6529gx8DmBntWLUWIYkGJ/o9F1Iwzlp3s3e0KWvsktMfyadgMbIQtolqXaVS7rpIs4uJBZjR1Ya2+MY/IyFZbNJM+ny3aYxGMvHcUhQ== transit@example.org

BIN
archive/tests/sample-2.msg Normal file

Binary file not shown.

BIN
archive/tests/sample.doc Normal file

Binary file not shown.

BIN
archive/tests/sample.docx Normal file

Binary file not shown.

BIN
archive/tests/sample.msg Normal file

Binary file not shown.

BIN
archive/tests/sample.pdf Normal file

Binary file not shown.

204
archive/tests/tests.py Normal file
View file

@ -0,0 +1,204 @@
import os.path
import subprocess
import tempfile
from datetime import date, timedelta
from pathlib import Path
from django.conf import settings
from django.contrib.auth.models import Group, Permission
from django.core.files import File
from django.test import TestCase, override_settings
from django.urls import reverse
from django.utils.text import slugify
from aemo.models import (
Bilan, Document, Famille, LibellePrestation, Personne, Role, Prestation,
Rapport, Utilisateur
)
from aemo.tests import InitialDataMixin, TempMediaRootMixin
from aemo.utils import format_d_m_Y
from ..models import Archive
public_key_path = os.path.join(settings.BASE_DIR, 'archive/tests/crne_rsa.pub')
@override_settings(CRNE_RSA_PUBLIC_KEY=public_key_path)
class ArchiveTests(InitialDataMixin, TempMediaRootMixin, TestCase):
def setUp(self) -> None:
self.user = Utilisateur.objects.create_user('user', 'user@example.org', sigle='XX')
self.create_kwargs = {
'nom': 'John Doe',
'unite': 'aemo',
'date_debut': date(2021, 1, 1),
'date_fin': date(2021, 12, 31),
'motif_fin': 'Autre',
'key': '',
'pdf': 'encrypted_data'
}
def _create_archive(self):
fam = Famille.objects.create_famille(nom='Haddock', equipe='aemo')
fam.suivi.date_fin_suivi = date(2019, 1, 1)
fam.suivi.motif_fin_suivi = 'autres'
fam.suivi.save()
for idx, doc_name in enumerate(['sample.docx', 'sample.doc', 'sample.pdf', 'sample.msg']):
doc = Document(famille=fam, titre=f"Test {idx}")
with (Path(__file__).parent / doc_name).open(mode='rb') as fh:
doc.fichier = File(fh, name=doc_name)
doc.save()
self.user.user_permissions.add(Permission.objects.get(codename='can_archive'))
self.client.force_login(self.user)
self.client.post(reverse('archive-add', args=['aemo', fam.pk]))
return Archive.objects.get(nom='Haddock', unite='aemo')
def test_model_creation(self):
arch = Archive.objects.create(**self.create_kwargs)
self.assertEqual(arch.nom, 'John Doe')
self.assertEqual(arch.unite, 'aemo')
self.assertEqual(arch.date_debut, date(2021, 1, 1))
self.assertEqual(arch.date_fin, date(2021, 12, 31))
self.assertEqual(arch.pdf, 'encrypted_data')
def test_sans_permission_d_archiver(self):
fam = Famille.objects.create_famille(nom='Haddock', equipe='aemo')
fam.suivi.date_fin_suivi = date(2019, 1, 1)
fam.suivi.motif_fin_suivi = 'autre'
fam.suivi.save()
self.assertEqual(fam.can_be_archived(self.user), False)
def test_avec_permission_d_archiver(self):
fam = Famille.objects.create_famille(nom='Haddock', equipe='aemo')
fam.suivi.date_fin_suivi = date(2019, 1, 1)
fam.suivi.motif_fin_suivi = 'autre'
fam.suivi.save()
self.user.user_permissions.add(Permission.objects.get(codename='can_archive'))
self.assertEqual(fam.can_be_archived(self.user), True)
def test_archivage_aemo(self):
famille = Famille.objects.create_famille(
nom='Doe', equipe='aemo', rue="Rue du lac", npa='2000', localite='Paris', telephone='012 345 67 89')
famille.suivi.date_fin_suivi = date.today() - timedelta(days=700)
famille.suivi.motif_fin_suivi = 'autre'
famille.suivi.save()
file_paths = []
for idx, doc_name in enumerate(['sample.docx', 'sample.doc', 'sample.pdf', 'sample.msg', 'sample-2.msg']):
doc = Document(famille=famille, titre=f"Test {idx}")
with (Path(__file__).parent / doc_name).open(mode='rb') as fh:
doc.fichier = File(fh, name=doc_name)
doc.save()
file_paths.append(doc.fichier.path)
Personne.objects.create_personne(
famille=famille, prenom='Archibald', nom='Doe', role=Role.objects.get(nom='Père')
)
enfant = Personne.objects.create_personne(
famille=famille, prenom='Gaston', nom='Doe', date_naissance=date.today() - timedelta(days=720),
role=Role.objects.get(nom='Enfant suivi')
)
enfant.formation.creche = 'Les Schtroumpfs'
enfant.formation.save()
Bilan.objects.create(famille=famille, date=date.today())
Rapport.objects.create(famille=famille, date=date.today(), auteur=self.user)
prest = Prestation.objects.create(
famille=famille, auteur=self.user, date_prestation=date.today(), duree=timedelta(hours=2),
lib_prestation=LibellePrestation.objects.first())
prest.intervenants.add(self.user, through_defaults={'role': Role.objects.get(nom='Référent')})
with (Path(__file__).parent / 'sample.pdf').open(mode='rb') as fh:
prest.fichier = File(fh, name=doc_name)
prest.save()
file_paths.append(prest.fichier.path)
grp = Group.objects.get(name='aemo')
self.user.groups.add(grp)
self.user.user_permissions.add(
*list(Permission.objects.filter(codename__in=['view_famille', 'change_famille', 'can_archive']))
)
self.assertTrue(famille.can_be_archived(self.user))
self.client.force_login(self.user)
response = self.client.post(reverse('archive-add', args=['aemo', famille.pk]))
self.assertRedirects(response, reverse('suivis-termines'))
famille.refresh_from_db()
self.assertNotEqual(famille.nom, 'Doe')
self.assertEqual(len(famille.nom), 10)
self.assertEqual(famille.rue, '')
self.assertEqual(famille.npa, '2000')
self.assertEqual(famille.localite, 'Paris'),
self.assertEqual(famille.telephone, '')
self.assertEqual(famille.parents(), [])
enfant.refresh_from_db()
self.assertNotEqual(enfant.nom, 'Doe')
self.assertEqual(len(famille.nom), 10)
self.assertNotEqual(enfant.prenom, 'Gaston')
self.assertEqual(len(famille.nom), 10)
self.assertEqual(enfant.formation.creche, '')
self.assertEqual(famille.documents.count(), 0)
self.assertEqual(famille.bilans.count(), 0)
self.assertEqual(famille.rapports.count(), 0)
for prest in famille.prestations.all():
self.assertEqual(prest.texte, '')
self.assertEqual(prest.duree, timedelta(hours=2))
self.assertEqual(bool(prest.fichier), False)
for path in file_paths:
self.assertFalse(Path(path).exists())
arch = Archive.objects.get(nom='Doe', unite='aemo')
self.assertTrue(os.path.exists(arch.pdf.path))
self.assertIn(f"aemo/doe-{famille.pk}", arch.pdf.name)
self.assertTrue(famille.prestations.exists())
# Cannot be archived a second time:
response = self.client.post(reverse('archive-add', args=['aemo', famille.pk]))
self.assertEqual(response.status_code, 404)
def test_decryptage_access(self):
arch = self._create_archive()
private_key = os.path.join(settings.BASE_DIR, 'archive/tests/crne_rsa')
anonymous = Utilisateur.objects.create_user('anonymous', email='anonymous@example.org')
self.client.force_login(anonymous)
with open(private_key, 'rb') as f:
response = self.client.post(reverse('archive-decrypt', args=[arch.pk]), data={'file': f})
self.assertEqual(response.status_code, 403)
anonymous.user_permissions.add(Permission.objects.get(codename='can_archive'))
self.client.force_login(anonymous)
with open(private_key, 'rb') as f:
response = self.client.post(reverse('archive-decrypt', args=[arch.pk]), data={'file': f})
self.assertEqual(response.status_code, 200)
def test_decryptage_aemo(self):
arch = self._create_archive()
private_key = os.path.join(settings.BASE_DIR, 'archive/tests/crne_rsa')
self.client.force_login(self.user)
response = self.client.post(reverse('archive-decrypt', args=[arch.pk]), data={'file': ''})
self.assertEqual(response.context['form'].errors, {'file': ['Ce champ est obligatoire.']})
with open(private_key, 'rb') as f:
response = self.client.post(reverse('archive-decrypt', args=[arch.pk]), data={'file': f})
self.assertEqual(
response.get('Content-Disposition'),
f"attachment; filename={slugify(arch.nom)}.pdf"
)
with tempfile.NamedTemporaryFile(delete=True, mode='wb') as fh:
fh.write(response.content)
subprocess.run(['pdftotext', fh.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
with open(f'{fh.name}.txt', 'r') as f:
content = f.read()
self.assertIn('Historique', content)
self.assertIn('Fin du suivi le :\n\n01.01.2019', content)
self.assertIn('Motif de fin de suivi :\n\nAutres', content)
self.assertIn(f"Date d'archivage :\n\n{format_d_m_Y(date.today())}", content)
self.assertIn('Famille Haddock', content)
self.assertIn('Informations', content)
self.assertIn("Fichier docx dexemple", content)
self.assertIn("Exemple de fichier doc", content)
self.assertIn("Fichier pdf dexemple", content)
self.assertIn("Kind regards", content)

9
archive/urls.py Normal file
View file

@ -0,0 +1,9 @@
from django.urls import path
from archive import views
urlpatterns = [
path('<str:unite>/famille/<int:pk>/add/', views.ArchiveCreateView.as_view(), name='archive-add'),
path('<str:unite>/list/', views.ArchiveListView.as_view(), name='archive-list'),
path('<int:pk>/decrypt/', views.ArchiveDecryptView.as_view(), name='archive-decrypt'),
]

153
archive/views.py Normal file
View file

@ -0,0 +1,153 @@
import base64
from io import BytesIO
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import serialization
from cryptography.fernet import Fernet, InvalidToken
from django.conf import settings
from django.contrib import messages
from django.contrib.auth.mixins import PermissionRequiredMixin
from django.core.exceptions import PermissionDenied
from django.core.files.base import ContentFile
from django.http import HttpResponse, HttpResponseRedirect, JsonResponse
from django.shortcuts import get_object_or_404, reverse
from django.template.loader import render_to_string
from django.utils import timezone
from django.utils.text import slugify
from django.views.generic import FormView, TemplateView, View
from aemo.models import Famille
from aemo.utils import is_ajax
from .models import Archive
from .pdf import ArchivePdf
from .forms import ArchiveFilterForm, ArchiveKeyUploadForm
class ArchiveCreateView(View):
def post(self, request, *args, **kwargs):
unite = 'aemo'
temp = BytesIO()
famille = get_object_or_404(Famille, pk=kwargs['pk'], archived_at__isnull=True)
intervenants_list = '/'.join([f"{interv.nom} {interv.prenom[0].upper()}."
for interv in famille.suivi.intervenants.all()])
ope_list = famille.suivi.ope_referents
motif_fin = famille.suivi.get_motif_fin_suivi_display()
pdf = ArchivePdf(temp, famille)
if not famille.can_be_archived(self.request.user):
raise PermissionDenied("Vous n'avez pas les droits nécessaires pour accéder à cette page.")
famille.archived_at = timezone.now()
pdf.produce()
filename = f"{unite}/{pdf.get_filename()}"
temp.seek(0)
pdf = temp.read()
# Create a symmetric Fernet key to encrypt the PDF, and encrypt that key with asymmetric RSA key.
key = Fernet.generate_key()
fernet = Fernet(key)
pdf_crypted = fernet.encrypt(pdf)
with open(settings.CRNE_RSA_PUBLIC_KEY, "rb") as key_file:
public_key = serialization.load_ssh_public_key(key_file.read())
padd = padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None)
fernet_crypted = public_key.encrypt(key, padding=padd)
nom_famille = famille.nom
arch = Archive.objects.create(
nom=nom_famille,
unite=unite,
intervenant=intervenants_list,
ope=", ".join([ope.nom_prenom for ope in ope_list]),
motif_fin=motif_fin,
date_debut=famille.suivi.date_debut_suivi,
date_fin=famille.suivi.date_fin_suivi,
key=base64.b64encode(fernet_crypted).decode(),
pdf=None
)
arch.pdf.save(filename, ContentFile(pdf_crypted))
famille.archived_at = timezone.now()
famille.save()
famille.anonymiser()
if is_ajax(request):
return JsonResponse({'id': famille.pk}, safe=True)
messages.success(request, f"La famille «{nom_famille}» a bien été archivée.")
return HttpResponseRedirect(reverse("suivis-termines"))
class ArchiveListView(TemplateView):
template_name = 'archive/list.html'
model = Archive
def dispatch(self, request, *args, **kwargs):
self.unite = self.kwargs['unite']
self.filter_form = ArchiveFilterForm(data=request.GET or None)
return super().dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
if is_ajax(request) and self.unite == 'aemo':
if self.filter_form.is_bound and self.filter_form.is_valid():
archives = self.filter_form.filter(Archive.objects.filter(unite=self.unite))
else:
archives = Archive.objects.none()
response = render_to_string(
template_name='archive/list_partial.html',
context={
'archives': archives,
'can_download': request.user.has_perm('aemo.can_archive'),
}
)
return JsonResponse(response, safe=False)
return super().get(request, *args, **kwargs)
def get_context_data(self, *args, **kwargs):
return {
**super().get_context_data(*args, **kwargs),
'unite': self.unite,
'form': self.filter_form,
'archives': Archive.objects.filter(unite=self.unite),
'can_download': self.request.user.has_perm('aemo.can_archive'),
}
class ArchiveDecryptView(PermissionRequiredMixin, FormView):
form_class = ArchiveKeyUploadForm
template_name = 'archive/key_upload.html'
permission_required = 'aemo.can_archive'
def form_valid(self, form):
arch = get_object_or_404(Archive, pk=self.kwargs['pk'])
try:
with open(arch.pdf.path, "rb") as fh:
pdf_crypted = fh.read()
except OSError as err:
messages.error(self.request, f"Erreur lors de la lecture du document ({str(err)})")
return HttpResponseRedirect(reverse('archive-list', args=[arch.unite]))
try:
private_key_file = self.request.FILES['file'].read()
private_key = serialization.load_pem_private_key(private_key_file, password=None)
padd = padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None)
sim_key = private_key.decrypt(base64.b64decode(arch.key), padding=padd)
fernet = Fernet(sim_key)
pdf_content = fernet.decrypt(pdf_crypted)
except ValueError as err:
messages.error(self.request, f"Erreur lors de la lecture de la clé ({str(err)})")
return HttpResponseRedirect(reverse('archive-list', args=[arch.unite]))
except InvalidToken:
messages.error(self.request, "Erreur lors du déchiffrement")
return HttpResponseRedirect(reverse('archive-list', args=[arch.unite]))
filename = f"{slugify(arch.nom)}.pdf"
response = HttpResponse(pdf_content, content_type='application/pdf')
response['Content-Disposition'] = "attachment; filename=%s" % filename
return response