Search code examples
javaapache-sparkapache-spark-dataset

A Spark Context has a content when submitting a Dataset to Spark, but Spark finds it null at the moment it is really building it


I have a method that performs multiple validation checking on a Dataset Row.

public boolean validationEtablissement(HistoriqueExecution historique, Row etablissement, boolean actifsSeulement, boolean nomenclaturesNAF2Valides, Map<String, Integer> indexs) {
   String siret = getString(indexs, etablissement, "siret");
   String siren = getString(indexs, etablissement, "siren");
   Boolean diffusable = getBoolean(indexs, etablissement, "diffusable");
   String nomenclatureAPE = getString(indexs, etablissement, "nomenclatureActivitePrincipale");
   String code = getString(indexs, etablissement, "activitePrincipale");
   String typeVoie = getString(indexs, etablissement, "typeDeVoie");
   String typeVoieSecondaire = getString(indexs, etablissement, "typeDeVoieSecondaire");
   String dateCreation = getString(indexs, etablissement, "dateCreationEtablissement");
   String dateDernierTraitement = getString(indexs, etablissement, "dateDernierTraitement");
   String dateDebutHistorisation = getString(indexs, etablissement, "dateDebutHistorisation");

   List<BooleanSupplier> validationsDemandees = new ArrayList<>();

   // Vérifier que le SIRET de l'établissement est correct.
   validationsDemandees.add(() -> valider(historique, etablissement, e -> getString(indexs, etablissement, "siret") == null,
      "etablissement sans SIRET, peut être étranger : '{}', écarté.", () -> new Object[]{getString(indexs, etablissement, "nomPaysEtranger")}));

   validationsDemandees.add(() -> valider(historique, etablissement, e -> new SIRET(siret).valide() == false,
      "etablissement au SIRET '{}' invalide, écarté.", () -> new Object[]{siret}));

   // Vérifier que le SIREN de l'entreprise mentionnée par l'établissement est correct.
   validationsDemandees.add(() -> valider(historique, etablissement, e -> siren == null,
      "etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET vaut : {}), écarté.", () -> new Object[]{getString(indexs, etablissement, "nomPaysEtranger"), siret}));

   validationsDemandees.add(() -> valider(historique, etablissement, e -> new SIREN(siren).valide() == false,
      "etablissement au SIREN d'entreprise '{}' invalide, écarté.", () -> new Object[]{siren}));

   if (nomenclaturesNAF2Valides) {
      // La nomenclature du code APE doit être NAFRev2.
      validationsDemandees.add(() -> valider(historique, etablissement, e -> nomenclatureAPE == null,
         "établissement de SIRET {} écarté : il n'a pas de nomemclature APE.", () -> new Object[]{siret}));

      validationsDemandees.add(() -> valider(historique, etablissement, e -> "NAFRev2".equals(nomenclatureAPE) == false,
         "établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue.", () -> new Object[]{siret, nomenclatureAPE}));

      // Le code APE doit être valide.
      validationsDemandees.add(() -> valider(historique, etablissement, e -> code == null,
         "établissement de SIRET {} écarté : il n'a pas de code APE.", () -> new Object[]{siret}));

      validationsDemandees.add(() -> valider(historique, etablissement, e -> new CodeAPE(code).valide(false) == false,
         "établissement de SIRET {} écarté : son code APE {} est invalide.", () -> new Object[]{siret, code}));
   }

   // Si le type de voie des adresses sont alimentés, vérifier qu'ils sont valides.
   validationsDemandees.add(() -> valider(historique, etablissement, e -> typeDeVoieInvalide(typeVoie, diffusable),
      "établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, est invalide", () -> new Object[]{siret, typeVoie}));

   validationsDemandees.add(() -> valider(historique, etablissement, e -> typeDeVoieInvalide(typeVoieSecondaire, diffusable),
      "établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, est invalide", () -> new Object[]{siret, typeVoieSecondaire}));

   validationsDemandees.add(() -> valider(historique, etablissement, e -> dateInvalide(dateCreation) != null,
      "établissement de SIRET {} écarté : sa date de création, {}, est invalide", () -> new Object[]{siret, dateCreation}));

   // Contrôler le format des dates
   validationsDemandees.add(() -> valider(historique, etablissement, e -> dateTimeInvalide(dateDernierTraitement) != null && dateInvalide(dateDernierTraitement) != null,
      "établissement de SIRET {} écarté : sa date de dernier traitement, {}, est invalide : {}", () -> new Object[]{siret, dateDernierTraitement, dateInvalide(dateDernierTraitement)}));

   validationsDemandees.add(() -> valider(historique, etablissement, e -> dateInvalide(dateDebutHistorisation) != null,
      "établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide", () -> new Object[]{siret, dateDebutHistorisation}));

   return validationsDemandees.stream().allMatch(BooleanSupplier::getAsBoolean);
}

For each checking that fails, I send a warning.
But I also wish to increase, in a longAccumulator, the number of problems of that kind I've encountered during the validation.

private boolean valider(HistoriqueExecution historique, Row row, Predicate<Row> conditionEchec, String warningFormat, Supplier<Object[]> arguments) {
   if (conditionEchec.test(row)) {
      LOGGER.warn(warningFormat, arguments.get());

      if (historique != null) {
         historique.incrementerOccurrences(warningFormat, true);
      }

      return false;
   }

   return true;
}

For that, I'm using that HistoriqueExecution class:

public class HistoriqueExecution implements Serializable {
   /** Accumulateurs d'erreurs associés. */
   private final Map<String, LongAccumulator> accumulators = new HashMap<>();

   /** Spark session */
   private SparkSession session;

   /**
    * Construire un historique d'exécution.
    */
   public HistoriqueExecution() {
   }

   /**
    * Construire un historique d'exécution.
    * @param session Session Spark.
    */
   public HistoriqueExecution(SparkSession session) {
      this.session = session;
   }

   /**
    * Construire un historique d'exécution.
    * @param session Session Spark.
    * @param codesMessages Code des messages qui pourront être émis.
    */
   public HistoriqueExecution(SparkSession session, String... codesMessages) {
      this(session);

      for(String codeMessage : codesMessages) {
         accumulators.put(codeMessage, session.sparkContext().longAccumulator(codeMessage));
      }
   }

   /**
    * Incrémenter l'occurence d'un code ou format de message.
    * @param session Session Spark.
    * @param codeOuFormatMessage Code ou Format de message
    * @param creerSiAbsent true s'il faut le créer s'il est absent, dans l'accumulateur.
    */
   public void incrementerOccurrences(SparkSession session, String codeOuFormatMessage, boolean creerSiAbsent) {
      LongAccumulator accumulator = accumulators.get(codeOuFormatMessage);

      if (accumulator == null && creerSiAbsent) {
         accumulator = accumulators.put(codeOuFormatMessage, session.sparkContext().longAccumulator(codeOuFormatMessage));
      }

      if (accumulator != null) {
         accumulator.add(1);
      }
   }

   /**
    * Incrémenter l'occurence d'un code ou format de message.
    * @param codeOuFormatMessage Code ou Format de message
    * @param creerSiAbsent true s'il faut le créer s'il est absent, dans l'accumulateur.
    */
   public void incrementerOccurrences(String codeOuFormatMessage, boolean creerSiAbsent) {
      LongAccumulator accumulator = accumulators.get(codeOuFormatMessage);

      if (accumulator == null && creerSiAbsent && this.session != null) {
         accumulator = accumulators.put(codeOuFormatMessage, this.session.sparkContext().longAccumulator(codeOuFormatMessage));
      }

      if (accumulator != null) {
         accumulator.add(1);
      }
   }

   /**
    * Fixer les codes messages qui pourront être émis.
    * @param session Session Spark.
    * @param codesMessages Code des messages qui pourront être émis.
    */
   public void setCodesMessages(SparkSession session, String... codesMessages) {
      setSparkSession(session);

      for(String codeMessage : codesMessages) {
         accumulators.put(codeMessage, session.sparkContext().longAccumulator(codeMessage));
      }
   }

   /**
    * Dumper en log les notifications accumulées.
    * @param log Logger.
    */
   public void dumpNotifications(Logger log) {
      if (this.accumulators.isEmpty()) {
         log.info("Il n'y a aucune notification dans l'historique d'exécution");
      }

      for(Map.Entry<String, LongAccumulator> rapport : getNotifications().entrySet()) {
         log.info("{} : {}", rapport.getKey(), rapport.getValue().count());
      }
   }
}

But I have a problem.

If I initialize the HistoriqueExecution with a set of acceptable messages codes it will count, at Dataset submission to Spark,

I mean: for Spark to receive that dataset and consider it like a bunch of lazy things to do later, but it doesn't try yet to build it, it works.

and then when it tries to build it, it passes through the statement:

LongAccumulator accumulator = accumulators.get(codeOuFormatMessage);

finds always each message code, and increase its counter.

public Dataset<Row> rowEtablissements(OptionsCreationLecture optionsCreationLecture, HistoriqueExecution historiqueExecution, int anneeCOG, int anneeSIRENE, boolean actifsSeulement, boolean communesValides, boolean nomenclaturesNAF2Valides) {
   if (historiqueExecution != null) {
      historiqueExecution.setCodesMessages(this.session,
         "etablissement sans SIRET, peut être étranger : '{}'",
         "etablissement au SIRET '{}' invalide, écarté.",
         "etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET vaut : {}), écarté.",
         "etablissement au SIREN d'entreprise '{}' invalide, écarté.",
         "établissement de SIRET {} écarté : il n'a pas de nomemclature APE.",
         "établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue.",
         "établissement de SIRET {} écarté : il n'a pas de code APE.",
         "établissement de SIRET {} écarté : son code APE {} est invalide.",
         "établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, est invalide",
         "établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, est invalide",
         "établissement de SIRET {} écarté : sa date de création, {}, est invalide",
         "établissement de SIRET {} écarté : sa date de dernier traitement, {}, est invalide : {}",
         "établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide");
   }

   Supplier<Dataset<Row>> worker = () -> {
      // [...Creating the dataset...]
      etablissements = etablissements.filter(
         (FilterFunction<Row>) etablissement -> this.validator.validationEtablissement(historiqueExecution, etablissement, actifsSeulement, nomenclaturesNAF2Valides, indexs));

      return etablissements;
   };

   return constitutionStandard(options, () -> worker.get()
         .withColumn("partitionSiren", SIREN_ENTREPRISE.col().substr(1,2)),
      new CacheParqueteur<>(options, this.session,
         "etablissements", "annee_{0,number,#0}-actifs_{1}-communes_verifiees_{2}-nafs_verifies_{3}", DEPARTEMENT_SIREN_SIRET,
         anneeSIRENE, anneeCOG, actifsSeulement, communesValides));
}

and logs (if I use dumpNotifications):

etablissement au SIRET '{}' invalide, écarté. : 0
établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue. : 0
etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET vaut : {}), écarté. : 0
etablissement sans SIRET, peut être étranger : '{}' : 0
établissement de SIRET {} écarté : son code APE {} est invalide. : 0
établissement de SIRET {} écarté : il n'a pas de code APE. : 0
établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide : 0
établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, est invalide : 1
établissement de SIRET {} écarté : sa date de dernier traitement, {}, est invalide : {} : 0
etablissement au SIREN d'entreprise '{}' invalide, écarté. : 0
établissement de SIRET {} écarté : il n'a pas de nomemclature APE. : 0
établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, est invalide : 0
établissement de SIRET {} écarté : sa date de création, {}, est invalide : 0

But if I don't initialize my HistoriqueExecution with messages codes,

historiqueExecution.setCodesMessages(this.session);

Spark must during the real creation of that Dataset, ask the creation of each missing counter, when needed, by the statement:

accumulator = accumulators.put(codeOuFormatMessage, this.session.sparkContext().longAccumulator(codeOuFormatMessage));

But then it fails on a NullPointerException.

Cannot invoke "org.apache.spark.SparkContext.longAccumulator(String)" because the return value of "org.apache.spark.sql.SparkSession.sparkContext()" is null

org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 57.0 failed 1 times, most recent failure: Lost task 12.0 in stage 57.0 (TID 2913) (192.168.1.153 executor driver): java.lang.NullPointerException: Cannot invoke "org.apache.spark.SparkContext.longAccumulator(String)" because the return value of "org.apache.spark.sql.SparkSession.sparkContext()" is null
    at fr.ecoemploi.adapters.outbound.spark.dataset.core.HistoriqueExecution.incrementerOccurrences(HistoriqueExecution.java:169)
    at fr.ecoemploi.adapters.outbound.spark.dataset.entreprise.EtablissementRowValidator.valider(EtablissementRowValidator.java:137)
    at fr.ecoemploi.adapters.outbound.spark.dataset.entreprise.EtablissementRowValidator.lambda$validationEtablissement$26(EtablissementRowValidator.java:91)
    at java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
    at java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602)
    at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
    at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
    at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637)
    at fr.ecoemploi.adapters.outbound.spark.dataset.entreprise.EtablissementRowValidator.validationEtablissement(EtablissementRowValidator.java:107)
    at fr.ecoemploi.adapters.outbound.spark.dataset.entreprise.EtablissementDataset.lambda$rowEtablissements$420108c5$1(EtablissementDataset.java:160)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.sort_addToSorter_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:385)
    at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)

SparkSession isn't null, hasn't changed since the beginning, but deliver a null context()?

Fearing an hidden serialization issue, I've made a try, removing the private SparkSession session; member variable of HistoriqueExecution (in case it wouldn't be serializable)
and passing that SparkSession as a parameter all along the methods calls.
But the problem is still here.


Solution

  • SparkContext/SparkSession do not exist on executors, your code assumes there is one instance on the driver accessible from the executor nodes and this isn't the case.

    (side note that validating each row like this is going to have performance impacts, you want to push as much as possible into the spark sql/dsl layer - Quality does this)

    In order to count failures you have to perform two passes (although you may cache the resulting dataset - I'd recommend writing it out if the results are valuable to you), one to create a pass/fail column and the other to sum the results.

    You can also create plugins to send messages back to the driver for each row validation but it's a lot easier to perform two passes.