Search code examples
apache-sparkjakarta-mailspark-streaming

SparkStreaming either running 2 times the same command or mail sending 2 times same mail


My spark application sends two mails when I send just 1 string to my Kafka Topic. Here is the interested part of code:

    JavaDStream<String> lines = kafkaStream.map ( [returns the 2nd value of the tuple];
    lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
    [... some stuff ...]

    JavaRDD<String[]> flagAddedRDD = associatedToPersonRDD.map(new Function<String[],String[]>(){

                @Override
                public String[] call(String[] arg0) throws Exception {
                    String[] s = new String[arg0.length+1];
                    System.arraycopy(arg0, 0, s, 0, arg0.length);
                    int a = FilePrinter.getAge(arg0[CSVExampleDevice.LENGTH+People.BIRTH_DATE]);
                    int p = Integer.parseInt(arg0[CSVExampleDevice.PULSE]);
                    if(
                        ((p<=45 || p>=185)&&(a<=12 || a>=70))
                            || 
                        (p>=190 || p<=40)){
                        s[arg0.length]="1";
                        Mailer.sendMail(mailTo, arg0);
                        }
                    else 
                        s[arg0.length]="0";
                    return s;
                }

            });`

I cannot understand if the mail sends two emails, because the transformation after this one is a save on file and this only return 1 line. The mailer.sendMail:

public static void sendMail(String whoTo, String[] whoIsDying){
    Properties props = new Properties();
    props.put("mail.smtp.host", "mail.***.com"); //edited
    props.put("mail.smtp.port", "25");
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    Session session = Session.getInstance(props);

    try {

        Message message = new MimeMessage(session);
        message.setFrom(new InternetAddress("***my-email***")); //edited
        for (String string : whoTo.split(",")) 
            message.addRecipient(Message.RecipientType.TO,
                    new InternetAddress(string));
        message.setSubject(whoIsDying[PersonClass.TIMESTAMP]);
        message.setText("trial");
        System.out.println("INFO: sent mail");
        Transport.send(message);    
    } catch (MessagingException e) {
        throw new RuntimeException(e);
    }
}

Solution

  • The reason this happens is because I called two actions at the end of the transformation:

    FilePrinter.saveAssociatedAsCSV(associatedSavePath, unifiedAssociatedStringRDD.collect()); //first action
    
                    JavaRDD<String[]> enrichedWithWeatherRDD = flagAddedRDD.map(new Function<String[],String[]>(){ [some more stuff] });
    
                    JavaRDD<String> unifiedEnrichedStringRDD = enrichedWithWeatherRDD.map(unifyArrayIntoString);
    
                    FilePrinter.saveEnrichedAsCSV(enrichedSavePath, unifiedEnrichedStringRDD.collect()); //second action
    

    and thus the whole transformation is called again and the mailer part is above both of these actions.