Moved imports views to their own file

This commit is contained in:
Claude Paroz 2018-07-13 11:33:38 +02:00
parent 266a14a261
commit 2cf1ae9724
3 changed files with 371 additions and 350 deletions

View file

@ -1,43 +1,34 @@
import json
import os
import re
from subprocess import PIPE, Popen, call
import tempfile
from collections import OrderedDict
from datetime import date, datetime, timedelta
from tabimport import CSVImportedFile, FileFactory
from django.conf import settings
from django.contrib import messages
from django.core.files import File
from django.core.mail import EmailMessage
from django.db import transaction
from django.db.models import Count, Value, Q, Sum
from django.db.models.functions import Concat
from django.db.models import Count
from django.http import HttpResponse, HttpResponseNotAllowed, HttpResponseRedirect
from django.shortcuts import get_object_or_404, redirect
from django.template import loader
from django.urls import reverse, reverse_lazy
from django.utils import timezone
from django.utils.dateformat import format as django_format
from django.utils.translation import ugettext as _
from django.utils.text import slugify
from django.views.generic import DetailView, FormView, TemplateView, ListView
from .base import EmailConfirmationBaseView, ZippedFilesBaseView
from .export import OpenXMLExport
from ..forms import EmailBaseForm, PeriodForm, StudentImportForm, UploadHPFileForm, UploadReportForm
from .imports import HPContactsImportView, HPImportView, ImportReportsView, StudentImportView
from ..forms import EmailBaseForm
from ..models import (
Klass, Section, Option, Student, Teacher, Corporation, CorpContact, Course, Period,
Klass, Section, Student, Teacher, Corporation, CorpContact, Period,
Training, Availability
)
from ..pdf import (
ChargeSheetPDF, ExpertEdeLetterPdf, UpdateDataFormPDF, MentorCompensationPdfForm,
KlassListPDF,
)
from ..utils import is_int, school_year_start
from ..utils import school_year_start
class CorporationListView(ListView):
@ -273,340 +264,6 @@ def del_training(request):
return HttpResponse(json.dumps({'ref_id': ref_id}), content_type="application/json")
class ImportViewBase(FormView):
template_name = 'file_import.html'
def form_valid(self, form):
upfile = form.cleaned_data['upload']
is_csv = (
upfile.name.endswith('.csv') or
'csv' in upfile.content_type or
upfile.content_type == 'text/plain'
)
try:
if is_csv:
# Reopen the file in text mode
upfile = open(upfile.temporary_file_path(), mode='r', encoding='utf-8-sig')
imp_file = CSVImportedFile(File(upfile))
else:
imp_file = FileFactory(upfile)
with transaction.atomic():
stats = self.import_data(imp_file)
except Exception as e:
if settings.DEBUG:
raise
msg = "L'importation a échoué. Erreur: %s" % e
if hasattr(upfile, 'content_type'):
msg += " (content-type: %s)" % upfile.content_type
messages.error(self.request, msg)
else:
non_fatal_errors = stats.get('errors', [])
if 'created' in stats:
messages.info(self.request, "Objets créés : %d" % stats['created'])
if 'modified' in stats:
messages.info(self.request, "Objets modifiés : %d" % stats['modified'])
if non_fatal_errors:
messages.warning(self.request, "Erreurs rencontrées: %s" % "\n".join(non_fatal_errors))
return HttpResponseRedirect(reverse('admin:index'))
class StudentImportView(ImportViewBase):
title = "Importation étudiants"
form_class = StudentImportForm
def import_data(self, up_file):
""" Import Student data from uploaded file. """
student_mapping = settings.STUDENT_IMPORT_MAPPING
student_rev_mapping = {v: k for k, v in student_mapping.items()}
corporation_mapping = settings.CORPORATION_IMPORT_MAPPING
instructor_mapping = settings.INSTRUCTOR_IMPORT_MAPPING
def strip(val):
return val.strip() if isinstance(val, str) else val
obj_created = obj_modified = 0
seen_students_ids = set()
for line in up_file:
student_defaults = {
val: strip(line[key]) for key, val in student_mapping.items()
}
if student_defaults['ext_id'] in seen_students_ids:
# Second line for student, ignore it
continue
seen_students_ids.add(student_defaults['ext_id'])
if student_defaults['birth_date'] == '':
student_defaults['birth_date'] = None
elif isinstance(student_defaults['birth_date'], str):
student_defaults['birth_date'] = datetime.strptime(student_defaults['birth_date'], '%d.%m.%Y').date()
if student_defaults['option_ase']:
try:
student_defaults['option_ase'] = Option.objects.get(name=student_defaults['option_ase'])
except Option.DoesNotExist:
del student_defaults['option_ase']
else:
del student_defaults['option_ase']
corporation_defaults = {
val: strip(line[key]) for key, val in corporation_mapping.items()
}
student_defaults['corporation'] = self.get_corporation(corporation_defaults)
defaults = Student.prepare_import(student_defaults)
try:
student = Student.objects.get(ext_id=student_defaults['ext_id'])
modified = False
for key, val in defaults.items():
if getattr(student, key) != val:
setattr(student, key, val)
modified = True
if modified:
student.save()
obj_modified += 1
except Student.DoesNotExist:
student = Student.objects.create(**defaults)
obj_created += 1
# FIXME: implement arch_staled
return {'created': obj_created, 'modified': obj_modified}
def get_corporation(self, corp_values):
if corp_values['ext_id'] == '':
return None
if 'city' in corp_values and is_int(corp_values['city'][:4]):
corp_values['pcode'], _, corp_values['city'] = corp_values['city'].partition(' ')
corp, created = Corporation.objects.get_or_create(
ext_id=corp_values['ext_id'],
defaults=corp_values
)
return corp
class HPImportView(ImportViewBase):
"""
Importation du fichier HyperPlanning pour l'établissement des feuilles
de charges.
"""
form_class = UploadHPFileForm
mapping = {
'NOMPERSO_ENS': 'teacher',
'LIBELLE_MAT': 'subject',
'NOMPERSO_DIP': 'public',
'TOTAL': 'period',
}
# Mapping between klass field and imputation
account_categories = OrderedDict([
('ASAFE', 'ASAFE'),
('ASEFE', 'ASEFE'),
('ASSCFE', 'ASSCFE'),
('#Mandat_ASA', 'ASAFE'),
('MPTS', 'MPTS'),
('MPS', 'MPS'),
('CMS ASE', 'MPTS'),
('CMS ASSC', 'MPS'),
('EDEpe', 'EDEpe'),
('EDEps', 'EDEps'),
('EDS', 'EDS'),
('CAS_FPP', 'CAS_FPP'),
# To split afterwards
('EDE', 'EDE'),
('#Mandat_ASE', 'ASE'),
('#Mandat_ASSC', 'ASSC'),
])
def import_data(self, up_file):
obj_created = obj_modified = 0
errors = []
# Pour accélérer la recherche
profs = {str(t): t for t in Teacher.objects.all()}
Course.objects.all().delete()
for line in up_file:
if (line['LIBELLE_MAT'] == '' or line['NOMPERSO_DIP'] == '' or line['TOTAL'] == ''):
continue
try:
teacher = profs[line['NOMPERSO_ENS']]
except KeyError:
errors.append(
"Impossible de trouver «%s» dans la liste des enseignant-e-s" % line['NOMPERSO_ENS']
)
continue
obj, created = Course.objects.get_or_create(
teacher=teacher,
subject=line['LIBELLE_MAT'],
public=line['NOMPERSO_DIP'],
)
period = int(float(line['TOTAL'].replace("'","")))
if created:
obj.period = period
obj_created += 1
for k, v in self.account_categories.items():
if k in obj.public:
obj.imputation = v
break
else:
obj.period += period
obj_modified += 1
obj.save()
if not obj.imputation:
errors.append("Le cours {0} n'a pas pu être imputé correctement!". format(str(obj)))
return {'created': obj_created, 'modified': obj_modified, 'errors': errors}
class HPContactsImportView(ImportViewBase):
"""
Importation du fichier Hyperplanning contenant les formateurs d'étudiants.
"""
form_class = UploadHPFileForm
def import_data(self, up_file):
obj_modified = 0
errors = []
for idx, line in enumerate(up_file, start=2):
try:
student = Student.objects.get(ext_id=int(line['UID_ETU']))
except Student.DoesNotExist:
errors.append(
"Impossible de trouver l'étudiant avec le numéro %s" % int(line['UID_ETU'])
)
continue
if not line['NoSIRET']:
errors.append(
"NoSIRET est vide à ligne %d. Ligne ignorée" % idx
)
continue
try:
corp = Corporation.objects.get(ext_id=int(line['NoSIRET']))
except Corporation.DoesNotExist:
errors.append(
"Impossible de trouver l'institution avec le numéro %s" % int(line['NoSIRET'])
)
continue
# Check corporation matches
if student.corporation_id != corp.pk:
# This import has priority over the corporation set by StudentImportView
student.corporation = corp
student.save()
contact = corp.corpcontact_set.filter(
first_name__iexact=line['PRENOMMDS'].strip(),
last_name__iexact=line['NOMMDS'].strip()
).first()
if contact is None:
contact = CorpContact.objects.create(
corporation=corp, first_name=line['PRENOMMDS'].strip(),
last_name=line['NOMMDS'].strip(), civility=line['CIVMDS'], email=line['EMAILMDS']
)
else:
if line['CIVMDS'] and contact.civility != line['CIVMDS']:
contact.civility = line['CIVMDS']
contact.save()
if line['EMAILMDS'] and contact.email != line['EMAILMDS']:
contact.email = line['EMAILMDS']
contact.save()
if student.instructor != contact:
student.instructor = contact
student.save()
obj_modified += 1
return {'modified': obj_modified, 'errors': errors}
class ImportReportsView(FormView):
template_name = 'file_import.html'
form_class = UploadReportForm
def dispatch(self, request, *args, **kwargs):
self.klass = get_object_or_404(Klass, pk=kwargs['pk'])
self.title = "Importation d'un fichier PDF de moyennes pour la classe {}".format(self.klass.name)
return super().dispatch(request, *args, **kwargs)
def form_valid(self, form):
upfile = form.cleaned_data['upload']
klass_name = upfile.name[:-4]
redirect_url = reverse('class', args=[self.klass.pk])
if self.klass.name != klass_name:
messages.error(self.request,
"Le fichier téléchargé ne correspond pas à la classe {} !".format(self.klass.name)
)
return HttpResponseRedirect(redirect_url)
# Check poppler-utils presence on server
res = call(['pdftotext', '-v'], stderr=PIPE)
if res != 0:
messages.error(self.request, "Unable to find pdftotext on your system. Try to install the poppler-utils package.")
return HttpResponseRedirect(redirect_url)
# Move the file to MEDIA directory
pdf_origin = os.path.join(settings.MEDIA_ROOT, upfile.name)
with open(pdf_origin, 'wb+') as destination:
for chunk in upfile.chunks():
destination.write(chunk)
try:
self.import_reports(pdf_origin, form.cleaned_data['semester'])
except Exception as err:
raise
if settings.DEBUG:
raise
else:
messages.error(self.request, "Erreur durant l'importation des bulletins PDF: %s" % err)
return HttpResponseRedirect(redirect_url)
def import_reports(self, pdf_path, semester):
path = os.path.abspath(pdf_path)
student_regex = '[E|É]lève\s*:\s*([^\n]*)'
# Directory automatically deleted when the variable is deleted
_temp_dir = tempfile.TemporaryDirectory()
temp_dir = _temp_dir.name
os.system("pdfseparate %s %s/%s_%%d.pdf" % (path, temp_dir, os.path.basename(path)[:-4]))
# Look for student names in each separated PDF and rename PDF with student name
pdf_count = 0
pdf_field = 'report_sem' + semester
for filename in os.listdir(temp_dir):
p = Popen(['pdftotext', os.path.join(temp_dir, filename), '-'],
shell=False, stdout=PIPE, stderr=PIPE)
output, errs = p.communicate()
m = re.search(student_regex, output.decode('utf-8'))
if not m:
print("Unable to find student name in %s" % filename)
continue
student_name = m.groups()[0]
# Find a student with the found student_name
try:
student = self.klass.student_set.exclude(archived=True
).annotate(fullname=Concat('last_name', Value(' '), 'first_name')).get(fullname=student_name)
except Student.DoesNotExist:
messages.warning(
self.request,
"Impossible de trouver l'étudiant {} dans la classe {}".format(student_name, self.klass.name)
)
continue
with open(os.path.join(temp_dir, filename), 'rb') as pdf:
getattr(student, pdf_field).save(filename, File(pdf), save=True)
student.save()
pdf_count += 1
messages.success(
self.request,
'{0} bulletins PDF ont été importés pour la classe {1} (sur {2} élèves)'.format(
pdf_count, self.klass.name,
self.klass.student_set.exclude(archived=True).count()
)
)
class SendStudentReportsView(FormView):
template_name = 'email_report.html'
form_class = EmailBaseForm

362
stages/views/imports.py Normal file
View file

@ -0,0 +1,362 @@
import os
import re
import tempfile
from collections import OrderedDict
from datetime import datetime
from subprocess import PIPE, Popen, call
from tabimport import CSVImportedFile, FileFactory
from django.conf import settings
from django.contrib import messages
from django.core.files import File
from django.db import transaction
from django.db.models import Value
from django.db.models.functions import Concat
from django.http import HttpResponseRedirect
from django.shortcuts import get_object_or_404
from django.urls import reverse
from django.views.generic import FormView
from ..forms import StudentImportForm, UploadHPFileForm, UploadReportForm
from ..models import (
Corporation, CorpContact, Course, Klass, Option, Student, Teacher,
)
from ..utils import is_int
class ImportViewBase(FormView):
template_name = 'file_import.html'
def form_valid(self, form):
upfile = form.cleaned_data['upload']
is_csv = (
upfile.name.endswith('.csv') or
'csv' in upfile.content_type or
upfile.content_type == 'text/plain'
)
try:
if is_csv:
# Reopen the file in text mode
upfile = open(upfile.temporary_file_path(), mode='r', encoding='utf-8-sig')
imp_file = CSVImportedFile(File(upfile))
else:
imp_file = FileFactory(upfile)
with transaction.atomic():
stats = self.import_data(imp_file)
except Exception as e:
if settings.DEBUG:
raise
msg = "L'importation a échoué. Erreur: %s" % e
if hasattr(upfile, 'content_type'):
msg += " (content-type: %s)" % upfile.content_type
messages.error(self.request, msg)
else:
non_fatal_errors = stats.get('errors', [])
if 'created' in stats:
messages.info(self.request, "Objets créés : %d" % stats['created'])
if 'modified' in stats:
messages.info(self.request, "Objets modifiés : %d" % stats['modified'])
if non_fatal_errors:
messages.warning(self.request, "Erreurs rencontrées: %s" % "\n".join(non_fatal_errors))
return HttpResponseRedirect(reverse('admin:index'))
class StudentImportView(ImportViewBase):
title = "Importation étudiants"
form_class = StudentImportForm
def import_data(self, up_file):
""" Import Student data from uploaded file. """
student_mapping = settings.STUDENT_IMPORT_MAPPING
corporation_mapping = settings.CORPORATION_IMPORT_MAPPING
def strip(val):
return val.strip() if isinstance(val, str) else val
obj_created = obj_modified = 0
seen_students_ids = set()
for line in up_file:
student_defaults = {
val: strip(line[key]) for key, val in student_mapping.items()
}
if student_defaults['ext_id'] in seen_students_ids:
# Second line for student, ignore it
continue
seen_students_ids.add(student_defaults['ext_id'])
if student_defaults['birth_date'] == '':
student_defaults['birth_date'] = None
elif isinstance(student_defaults['birth_date'], str):
student_defaults['birth_date'] = datetime.strptime(student_defaults['birth_date'], '%d.%m.%Y').date()
if student_defaults['option_ase']:
try:
student_defaults['option_ase'] = Option.objects.get(name=student_defaults['option_ase'])
except Option.DoesNotExist:
del student_defaults['option_ase']
else:
del student_defaults['option_ase']
corporation_defaults = {
val: strip(line[key]) for key, val in corporation_mapping.items()
}
student_defaults['corporation'] = self.get_corporation(corporation_defaults)
defaults = Student.prepare_import(student_defaults)
try:
student = Student.objects.get(ext_id=student_defaults['ext_id'])
modified = False
for key, val in defaults.items():
if getattr(student, key) != val:
setattr(student, key, val)
modified = True
if modified:
student.save()
obj_modified += 1
except Student.DoesNotExist:
student = Student.objects.create(**defaults)
obj_created += 1
# FIXME: implement arch_staled
return {'created': obj_created, 'modified': obj_modified}
def get_corporation(self, corp_values):
if corp_values['ext_id'] == '':
return None
if 'city' in corp_values and is_int(corp_values['city'][:4]):
corp_values['pcode'], _, corp_values['city'] = corp_values['city'].partition(' ')
corp, created = Corporation.objects.get_or_create(
ext_id=corp_values['ext_id'],
defaults=corp_values
)
return corp
class HPImportView(ImportViewBase):
"""
Importation du fichier HyperPlanning pour l'établissement des feuilles
de charges.
"""
form_class = UploadHPFileForm
mapping = {
'NOMPERSO_ENS': 'teacher',
'LIBELLE_MAT': 'subject',
'NOMPERSO_DIP': 'public',
'TOTAL': 'period',
}
# Mapping between klass field and imputation
account_categories = OrderedDict([
('ASAFE', 'ASAFE'),
('ASEFE', 'ASEFE'),
('ASSCFE', 'ASSCFE'),
('#Mandat_ASA', 'ASAFE'),
('MPTS', 'MPTS'),
('MPS', 'MPS'),
('CMS ASE', 'MPTS'),
('CMS ASSC', 'MPS'),
('EDEpe', 'EDEpe'),
('EDEps', 'EDEps'),
('EDS', 'EDS'),
('CAS_FPP', 'CAS_FPP'),
# To split afterwards
('EDE', 'EDE'),
('#Mandat_ASE', 'ASE'),
('#Mandat_ASSC', 'ASSC'),
])
def import_data(self, up_file):
obj_created = obj_modified = 0
errors = []
# Pour accélérer la recherche
profs = {str(t): t for t in Teacher.objects.all()}
Course.objects.all().delete()
for line in up_file:
if (line['LIBELLE_MAT'] == '' or line['NOMPERSO_DIP'] == '' or line['TOTAL'] == ''):
continue
try:
teacher = profs[line['NOMPERSO_ENS']]
except KeyError:
errors.append(
"Impossible de trouver «%s» dans la liste des enseignant-e-s" % line['NOMPERSO_ENS']
)
continue
obj, created = Course.objects.get_or_create(
teacher=teacher,
subject=line['LIBELLE_MAT'],
public=line['NOMPERSO_DIP'],
)
period = int(float(line['TOTAL'].replace("'", "")))
if created:
obj.period = period
obj_created += 1
for k, v in self.account_categories.items():
if k in obj.public:
obj.imputation = v
break
else:
obj.period += period
obj_modified += 1
obj.save()
if not obj.imputation:
errors.append("Le cours {0} n'a pas pu être imputé correctement!". format(str(obj)))
return {'created': obj_created, 'modified': obj_modified, 'errors': errors}
class HPContactsImportView(ImportViewBase):
"""
Importation du fichier Hyperplanning contenant les formateurs d'étudiants.
"""
form_class = UploadHPFileForm
def import_data(self, up_file):
obj_modified = 0
errors = []
for idx, line in enumerate(up_file, start=2):
try:
student = Student.objects.get(ext_id=int(line['UID_ETU']))
except Student.DoesNotExist:
errors.append(
"Impossible de trouver l'étudiant avec le numéro %s" % int(line['UID_ETU'])
)
continue
if not line['NoSIRET']:
errors.append(
"NoSIRET est vide à ligne %d. Ligne ignorée" % idx
)
continue
try:
corp = Corporation.objects.get(ext_id=int(line['NoSIRET']))
except Corporation.DoesNotExist:
errors.append(
"Impossible de trouver l'institution avec le numéro %s" % int(line['NoSIRET'])
)
continue
# Check corporation matches
if student.corporation_id != corp.pk:
# This import has priority over the corporation set by StudentImportView
student.corporation = corp
student.save()
contact = corp.corpcontact_set.filter(
first_name__iexact=line['PRENOMMDS'].strip(),
last_name__iexact=line['NOMMDS'].strip()
).first()
if contact is None:
contact = CorpContact.objects.create(
corporation=corp, first_name=line['PRENOMMDS'].strip(),
last_name=line['NOMMDS'].strip(), civility=line['CIVMDS'], email=line['EMAILMDS']
)
else:
if line['CIVMDS'] and contact.civility != line['CIVMDS']:
contact.civility = line['CIVMDS']
contact.save()
if line['EMAILMDS'] and contact.email != line['EMAILMDS']:
contact.email = line['EMAILMDS']
contact.save()
if student.instructor != contact:
student.instructor = contact
student.save()
obj_modified += 1
return {'modified': obj_modified, 'errors': errors}
class ImportReportsView(FormView):
template_name = 'file_import.html'
form_class = UploadReportForm
def dispatch(self, request, *args, **kwargs):
self.klass = get_object_or_404(Klass, pk=kwargs['pk'])
self.title = "Importation d'un fichier PDF de moyennes pour la classe {}".format(self.klass.name)
return super().dispatch(request, *args, **kwargs)
def form_valid(self, form):
upfile = form.cleaned_data['upload']
klass_name = upfile.name[:-4]
redirect_url = reverse('class', args=[self.klass.pk])
if self.klass.name != klass_name:
messages.error(
self.request,
"Le fichier téléchargé ne correspond pas à la classe {} !".format(self.klass.name)
)
return HttpResponseRedirect(redirect_url)
# Check poppler-utils presence on server
res = call(['pdftotext', '-v'], stderr=PIPE)
if res != 0:
messages.error(
self.request,
"Unable to find pdftotext on your system. Try to install the poppler-utils package."
)
return HttpResponseRedirect(redirect_url)
# Move the file to MEDIA directory
pdf_origin = os.path.join(settings.MEDIA_ROOT, upfile.name)
with open(pdf_origin, 'wb+') as destination:
for chunk in upfile.chunks():
destination.write(chunk)
try:
self.import_reports(pdf_origin, form.cleaned_data['semester'])
except Exception as err:
raise
if settings.DEBUG:
raise
else:
messages.error(self.request, "Erreur durant l'importation des bulletins PDF: %s" % err)
return HttpResponseRedirect(redirect_url)
def import_reports(self, pdf_path, semester):
path = os.path.abspath(pdf_path)
student_regex = '[E|É]lève\s*:\s*([^\n]*)'
# Directory automatically deleted when the variable is deleted
_temp_dir = tempfile.TemporaryDirectory()
temp_dir = _temp_dir.name
os.system("pdfseparate %s %s/%s_%%d.pdf" % (path, temp_dir, os.path.basename(path)[:-4]))
# Look for student names in each separated PDF and rename PDF with student name
pdf_count = 0
pdf_field = 'report_sem' + semester
for filename in os.listdir(temp_dir):
p = Popen(['pdftotext', os.path.join(temp_dir, filename), '-'],
shell=False, stdout=PIPE, stderr=PIPE)
output, errs = p.communicate()
m = re.search(student_regex, output.decode('utf-8'))
if not m:
print("Unable to find student name in %s" % filename)
continue
student_name = m.groups()[0]
# Find a student with the found student_name
try:
student = self.klass.student_set.exclude(archived=True
).annotate(fullname=Concat('last_name', Value(' '), 'first_name')).get(fullname=student_name)
except Student.DoesNotExist:
messages.warning(
self.request,
"Impossible de trouver l'étudiant {} dans la classe {}".format(student_name, self.klass.name)
)
continue
with open(os.path.join(temp_dir, filename), 'rb') as pdf:
getattr(student, pdf_field).save(filename, File(pdf), save=True)
student.save()
pdf_count += 1
messages.success(
self.request,
'{0} bulletins PDF ont été importés pour la classe {1} (sur {2} élèves)'.format(
pdf_count, self.klass.name,
self.klass.student_set.exclude(archived=True).count()
)
)