Search code examples
apache-sparkkubernetesshuffleapache-zeppelinexecutor

apache zeppelin and spark on kubernetes no communication between executors possible over shuffle


I've implemented apache zeppelin 0.10.1 on kubernetes. It uses spark version 3.2.1 .

My problem is that the executors cant communicate with each other while shuffling, but still can exchange data with the driver. So the problem is probably a network problem, a permission problem or a data-serialization problem.

A data-serialization problem because the driver tells in his log file that there was a problem in serializing data some seconds before the executor error pops up in their log file.

Spark is running in client mode and every executor runs in his own pod.

Below i'm showing the spark-driver log file:

    Interpreter download command: /usr/lib/jvm/java-8-openjdk-amd64/bin/java 
-Dfile.encoding=UTF-8 -Dlog4j.configuration=file:///opt/zeppelin/conf/log4j.properties 
-Dlog4j.configurationFile=file:///opt/zeppelin/conf/log4j2.properties 
-Dzeppelin.log.file=/opt/zeppelin/logs/zeppelin-interpreter-spark-shared_process--spark-
gxaleg.log -cp :/opt/zeppelin/interpreter/spark/*:::/opt/zeppelin/interpreter/zeppelin-
interpreter-shaded-0.10.1.jar:/opt/zeppelin/interpreter/spark/spark-interpreter-0.10.1.jar 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterDownloader zeppelin-
server.zeppelin.svc 12320 spark /tmp/local-repo/spark
.
.(deleted those lines because of stackoverflow line restrictions)
.
    ERROR [2022-06-10 10:01:56,657] ({ParallelScheduler-Worker-1} Logging.scala[logError]:94) - failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 163, Column 76: Cannot determine simple type name "$line34105858122"
    org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 163, Column 76: Cannot determine simple type name "$line34105858122"
        at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12021)
        .. (i deleted some lines)
        at java.lang.Thread.run(Thread.java:748)
     INFO [2022-06-10 10:01:56,685] ({ParallelScheduler-Worker-1} Logging.scala[logInfo]:57) - 
    /* 001 */ public Object generate(Object[] references) {
    /* 002 */   return new GeneratedIteratorForCodegenStage1(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ // codegenStageId=1
    /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
    /* 007 */   private Object[] references;
    /* 008 */   private scala.collection.Iterator[] inputs;
    /* 009 */   private boolean agg_initAgg_0;
    /* 010 */   private boolean agg_bufIsNull_0;
    /* 011 */   private long agg_bufValue_0;
    /* 012 */   private agg_FastHashMap_0 agg_fastHashMap_0;
    /* 013 */   private org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> agg_fastHashMapIter_0;
    /* 014 */   private org.apache.spark.unsafe.KVIterator agg_mapIter_0;
    /* 015 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap_0;
    /* 016 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter_0;
    /* 017 */   private scala.collection.Iterator inputadapter_input_0;
    /* 018 */   private boolean serializefromobject_resultIsNull_0;
    /* 019 */   private boolean serializefromobject_resultIsNull_1;
    /* 020 */   private boolean serializefromobject_resultIsNull_2;
    /* 021 */   private java.lang.String[] serializefromobject_mutableStateArray_0 = new java.lang.String[3];
    /* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] serializefromobject_mutableStateArray_1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[6];
    /* 023 */
    /* 024 */   public GeneratedIteratorForCodegenStage1(Object[] references) {
    /* 025 */     this.references = references;
    /* 026 */   }
    /* 027 */
    /* 028 */   public void init(int index, scala.collection.Iterator[] inputs) {
    /* 029 */     partitionIndex = index;
    /* 030 */     this.inputs = inputs;
    /* 031 */
    /* 032 */     inputadapter_input_0 = inputs[0];
    /* 033 */     serializefromobject_mutableStateArray_1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(5, 96);
    /* 034 */     serializefromobject_mutableStateArray_1[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(5, 96);
    /* 035 */     serializefromobject_mutableStateArray_1[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 036 */     serializefromobject_mutableStateArray_1[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 037 */     serializefromobject_mutableStateArray_1[4] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 038 */     serializefromobject_mutableStateArray_1[5] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
    /* 039 */
    /* 040 */   }
    /* 041 */
    /* 042 */   public class agg_FastHashMap_0 {
    /* 043 */     private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
    /* 044 */     private int[] buckets;
    /* 045 */     private int capacity = 1 << 16;
    /* 046 */     private double loadFactor = 0.5;
    /* 047 */     private int numBuckets = (int) (capacity / loadFactor);
    /* 048 */     private int maxSteps = 2;
    /* 049 */     private int numRows = 0;
    /* 050 */     private Object emptyVBase;
    /* 051 */     private long emptyVOff;
    /* 052 */     private int emptyVLen;
    /* 053 */     private boolean isBatchFull = false;
    /* 054 */     private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
    /* 055 */
    /* 056 */     public agg_FastHashMap_0(
    /* 057 */       org.apache.spark.memory.TaskMemoryManager taskMemoryManager,
    /* 058 */       InternalRow emptyAggregationBuffer) {
    /* 059 */       batch = org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch
    /* 060 */       .allocate(((org.apache.spark.sql.types.StructType) references[1] /* keySchemaTerm */), ((org.apache.spark.sql.types.StructType) references[2] /* valueSchemaTerm */), taskMemoryManager, capacity);
    /* 061 */
    /* 062 */       final UnsafeProjection valueProjection = UnsafeProjection.create(((org.apache.spark.sql.types.StructType) references[2] /* valueSchemaTerm */));
    /* 063 */       final byte[] emptyBuffer = valueProjection.apply(emptyAggregationBuffer).getBytes();
    /* 064 */
    /* 065 */       emptyVBase = emptyBuffer;
    /* 066 */       emptyVOff = Platform.BYTE_ARRAY_OFFSET;
    /* 067 */       emptyVLen = emptyBuffer.length;
    /* 068 */
    /* 069 */       agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
    /* 070 */         1, 0);
    /* 071 */
    /* 072 */       buckets = new int[numBuckets];
    /* 073 */       java.util.Arrays.fill(buckets, -1);
    /* 074 */     }
    /* 075 */
    /* 076 */     public org.apache.spark.sql.catalyst.expressions.UnsafeRow findOrInsert(int agg_key_0) {
    /* 077 */       long h = hash(agg_key_0);
    /* 078 */       int step = 0;
    /* 079 */       int idx = (int) h & (numBuckets - 1);
    /* 080 */       while (step < maxSteps) {
    /* 081 */         // Return bucket index if it's either an empty slot or already contains the key
    /* 082 */         if (buckets[idx] == -1) {
    /* 083 */           if (numRows < capacity && !isBatchFull) {
    /* 084 */             agg_rowWriter.reset();
    /* 085 */             agg_rowWriter.zeroOutNullBytes();
    /* 086 */             agg_rowWriter.write(0, agg_key_0);
    /* 087 */             org.apache.spark.sql.catalyst.expressions.UnsafeRow agg_result
    /* 088 */             = agg_rowWriter.getRow();
    /* 089 */             Object kbase = agg_result.getBaseObject();
    /* 090 */             long koff = agg_result.getBaseOffset();
    /* 091 */             int klen = agg_result.getSizeInBytes();
    /* 092 */
    /* 093 */             UnsafeRow vRow
    /* 094 */             = batch.appendRow(kbase, koff, klen, emptyVBase, emptyVOff, emptyVLen);
    /* 095 */             if (vRow == null) {
    /* 096 */               isBatchFull = true;
    /* 097 */             } else {
    /* 098 */               buckets[idx] = numRows++;
    /* 099 */             }
    /* 100 */             return vRow;
    /* 101 */           } else {
    /* 102 */             // No more space
    /* 103 */             return null;
    /* 104 */           }
    /* 105 */         } else if (equals(idx, agg_key_0)) {
    /* 106 */           return batch.getValueRow(buckets[idx]);
    /* 107 */         }
    /* 108 */         idx = (idx + 1) & (numBuckets - 1);
    /* 109 */         step++;
    /* 110 */       }
    /* 111 */       // Didn't find it
    /* 112 */       return null;
    /* 113 */     }
    /* 114 */
    /* 115 */     private boolean equals(int idx, int agg_key_0) {
    /* 116 */       UnsafeRow row = batch.getKeyRow(buckets[idx]);
    /* 117 */       return (row.getInt(0) == agg_key_0);
    /* 118 */     }
    /* 119 */
    /* 120 */     private long hash(int agg_key_0) {
    /* 121 */       long agg_hash_0 = 0;
    /* 122 */
    /* 123 */       int agg_result_0 = agg_key_0;
    /* 124 */       agg_hash_0 = (agg_hash_0 ^ (0x9e3779b9)) + agg_result_0 + (agg_hash_0 << 6) + (agg_hash_0 >>> 2);
    /* 125 */
    /* 126 */       return agg_hash_0;
    /* 127 */     }
    /* 128 */
    /* 129 */     public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
    /* 130 */       return batch.rowIterator();
    /* 131 */     }
    /* 132 */
    /* 133 */     public void close() {
    /* 134 */       batch.close();
    /* 135 */     }
    /* 136 */
    /* 137 */   }
    /* 138 */
    /* 139 */   private void agg_doAggregateWithKeysOutput_0(UnsafeRow agg_keyTerm_0, UnsafeRow agg_bufferTerm_0)
    /* 140 */   throws java.io.IOException {
    /* 141 */     ((org.apache.spark.sql.execution.metric.SQLMetric) references[13] /* numOutputRows */).add(1);
    /* 142 */
    /* 143 */     boolean agg_isNull_7 = agg_keyTerm_0.isNullAt(0);
    /* 144 */     int agg_value_8 = agg_isNull_7 ?
    /* 145 */     -1 : (agg_keyTerm_0.getInt(0));
    /* 146 */     long agg_value_9 = agg_bufferTerm_0.getLong(0);
    /* 147 */
    /* 148 */     serializefromobject_mutableStateArray_1[5].reset();
    /* 149 */
    /* 150 */     serializefromobject_mutableStateArray_1[5].zeroOutNullBytes();
    /* 151 */
    /* 152 */     if (agg_isNull_7) {
    /* 153 */       serializefromobject_mutableStateArray_1[5].setNullAt(0);
    /* 154 */     } else {
    /* 155 */       serializefromobject_mutableStateArray_1[5].write(0, agg_value_8);
    /* 156 */     }
    /* 157 */
    /* 158 */     serializefromobject_mutableStateArray_1[5].write(1, agg_value_9);
    /* 159 */     append((serializefromobject_mutableStateArray_1[5].getRow()));
    /* 160 */
    /* 161 */   }
    /* 162 */
    /* 163 */   private void serializefromobject_doConsume_0(InternalRow inputadapter_row_0, $line34105858122.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bank serializefromobject_expr_0_0, boolean serializefromobject_exprIsNull_0_0) throws java.io.IOException {
    /* 164 */     do {
    /* 165 */       if (serializefromobject_exprIsNull_0_0) {
    /* 166 */         throw new NullPointerException(((java.lang.String) references[7] /* errMsg */));
    /* 167 */       }
    /* 168 */       boolean serializefromobject_isNull_1 = true;
    /* 169 */       java.lang.Integer serializefromobject_value_1 = null;
    /* 170 */       serializefromobject_isNull_1 = false;
    /* 171 */       if (!serializefromobject_isNull_1) {
    /* 172 */         Object serializefromobject_funcResult_0 = null;
    /* 173 */         serializefromobject_funcResult_0 = serializefromobject_expr_0_0.age();
    /* 174 */
    /* 175 */         if (serializefromobject_funcResult_0 != null) {
    /* 176 */           serializefromobject_value_1 = (java.lang.Integer) serializefromobject_funcResult_0;
    /* 177 */         } else {
    /* 178 */           serializefromobject_isNull_1 = true;
    /* 179 */         }
    /* 180 */
    /* 181 */       }
    /* 182 */       boolean serializefromobject_isNull_0 = true;
    /* 183 */       int serializefromobject_value_0 = -1;
    /* 184 */       if (!serializefromobject_isNull_1) {
    /* 185 */         serializefromobject_isNull_0 = false;
    /* 186 */         if (!serializefromobject_isNull_0) {
    /* 187 */           serializefromobject_value_0 = serializefromobject_value_1.intValue();
    /* 188 */         }
    /* 189 */       }
    /* 190 */
    /* 191 */       boolean filter_value_2 = !serializefromobject_isNull_0;
    /* 192 */       if (!filter_value_2) continue;
    /* 193 */
    /* 194 */       boolean filter_value_3 = false;
    /* 195 */       filter_value_3 = serializefromobject_value_0 < 30;
    /* 196 */       if (!filter_value_3) continue;
    /* 197 */
    /* 198 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[12] /* numOutputRows */).add(1);
    /* 199 */
    /* 200 */       // common sub-expressions
    /* 201 */
    /* 202 */       agg_doConsume_0(serializefromobject_value_0, false);
    /* 203 */
    /* 204 */     } while(false);
    /* 205 */
    /* 206 */   }
    /* 207 */
  ...
    /* 330 */   }
    /* 331 */
    /* 332 */ }
    
     WARN [2022-06-10 10:01:56,689] ({ParallelScheduler-Worker-1} Logging.scala[logWarning]:69) - Whole-stage codegen disabled for plan (id=1):
     *(1) HashAggregate(keys=[age#6], functions=[partial_count(1)], output=[age#6, count#29L])
    +- *(1) Project [age#6]
       +- *(1) Filter (isnotnull(age#6) AND (age#6 < 30))
          +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, $line34105858122.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bank, true])).age.intValue AS age#6, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, $line34105858122.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bank, true])).job, true, false, true) AS job#7, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, $line34105858122.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bank, true])).marital, true, false, true) AS marital#8, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, $line34105858122.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bank, true])).education, true, false, true) AS education#9, knownnotnull(assertnotnull(input[0, $line34105858122.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bank, true])).balance.intValue AS balance#10]
             +- Scan[obj#5]

Below is the first executor:

22/06/10 10:01:57 INFO CoarseGrainedExecutorBackend: Got assigned task 0
22/06/10 10:01:57 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
22/06/10 10:01:57 INFO TorrentBroadcast: Started reading broadcast variable 0 with 1 pieces (estimated total size 4.0 MiB)
22/06/10 10:01:57 INFO TransportClientFactory: Successfully created connection to spark-gxaleg.zeppelin.svc/10.42.8.188:22322 after 3 ms (0 ms spent in bootstraps)
22/06/10 10:01:57 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 12.2 KiB, free 413.9 MiB)
22/06/10 10:01:57 INFO TorrentBroadcast: Reading broadcast variable 0 took 221 ms
22/06/10 10:01:58 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 28.3 KiB, free 413.9 MiB)
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 717.142754 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 52.025602 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 12.942418 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 14.006444 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 14.341434 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 13.481751 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 20.743977 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 76.48826 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 100.251714 ms
22/06/10 10:02:01 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2538 bytes result sent to driver
22/06/10 10:02:27 INFO Executor: Told to re-register on heartbeat
22/06/10 10:02:27 INFO BlockManager: BlockManager BlockManagerId(1, 10.42.9.230, 22322, None) re-registering with master
22/06/10 10:02:27 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(1, 10.42.9.230, 22322, None)
22/06/10 10:02:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(1, 10.42.9.230, 22322, None)
22/06/10 10:02:27 INFO BlockManager: Reporting 2 blocks to the master.

Below is the second executor:

22/06/10 10:01:57 INFO CoarseGrainedExecutorBackend: Got assigned task 1
22/06/10 10:01:57 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
22/06/10 10:01:57 INFO TorrentBroadcast: Started reading broadcast variable 0 with 1 pieces (estimated total size 4.0 MiB)
22/06/10 10:01:57 INFO TransportClientFactory: Successfully created connection to spark-gxaleg.zeppelin.svc/10.42.8.188:22322 after 27 ms (0 ms spent in bootstraps)
22/06/10 10:01:57 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 12.2 KiB, free 413.9 MiB)
22/06/10 10:01:57 INFO TorrentBroadcast: Reading broadcast variable 0 took 186 ms
22/06/10 10:01:57 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 28.3 KiB, free 413.9 MiB)
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 707.100922 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 19.523219 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 81.661107 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 16.067472 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 79.90976 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 17.750182 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 24.625673 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 16.542486 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 94.069283 ms
22/06/10 10:02:02 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2538 bytes result sent to driver
22/06/10 10:02:02 INFO CoarseGrainedExecutorBackend: Got assigned task 2
22/06/10 10:02:02 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
22/06/10 10:02:02 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache
22/06/10 10:02:02 INFO TorrentBroadcast: Started reading broadcast variable 1 with 1 pieces (estimated total size 4.0 MiB)
22/06/10 10:02:02 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 17.6 KiB, free 413.9 MiB)
22/06/10 10:02:02 INFO TorrentBroadcast: Reading broadcast variable 1 took 19 ms
22/06/10 10:02:02 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 38.6 KiB, free 413.8 MiB)
22/06/10 10:02:02 INFO CodeGenerator: Code generated in 25.099699 ms
22/06/10 10:02:02 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 0, fetching them
22/06/10 10:02:02 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://[email protected]:22321)
22/06/10 10:02:02 INFO MapOutputTrackerWorker: Got the map output locations
22/06/10 10:02:02 INFO ShuffleBlockFetcherIterator: Getting 2 (1282.0 B) non-empty blocks including 1 (608.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) push-merged-local and 1 (674.0 B) remote blocks
22/06/10 10:02:02 INFO TransportClientFactory: Successfully created connection to /10.42.9.230:22322 after 2 ms (0 ms spent in bootstraps)
22/06/10 10:02:02 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from <unknown remote> is closed
22/06/10 10:02:02 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 71 ms
22/06/10 10:02:02 ERROR OneForOneBlockFetcher: Failed while starting block fetches
java.io.IOException: Connection from <unknown remote> closed
    at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    ... (i deleted some lines)
    at java.base/java.lang.Thread.run(Unknown Source)
22/06/10 10:02:02 INFO RetryingBlockTransferor: Retrying fetch (1/3) for 1 outstanding blocks after 5000 ms
22/06/10 10:02:02 INFO CodeGenerator: Code generated in 99.746798 ms
22/06/10 10:02:03 INFO CodeGenerator: Code generated in 12.970246 ms
22/06/10 10:02:07 INFO TransportClientFactory: Found inactive connection to /10.42.9.230:22322, creating a new one.
22/06/10 10:02:07 INFO TransportClientFactory: Successfully created connection to /10.42.9.230:22322 after 2 ms (0 ms spent in bootstraps)
22/06/10 10:02:07 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from <unknown remote> is closed
22/06/10 10:02:07 ERROR OneForOneBlockFetcher: Failed while starting block fetches
java.io.IOException: Connection from <unknown remote> closed
    at 

This is the code snippet i tried to run:

import org.apache.commons.io.IOUtils
import java.net.URL
import java.nio.charset.Charset

// Zeppelin creates and injects sc (SparkContext) and sqlContext (HiveContext or SqlContext)
// So you don't need create them manually

// load bank data
val bankText = sc.parallelize(
    IOUtils.toString(
        new URL("https://s3.amazonaws.com/apache-zeppelin/tutorial/bank/bank.csv"),
        Charset.forName("utf8")).split("\n"))

case class Bank(age: Integer, job: String, marital: String, education: String, balance: Integer)

val bank = bankText.map(s => s.split(";")).filter(s => s(0) != "\"age\"").map(
    s => Bank(s(0).toInt, 
            s(1).replaceAll("\"", ""),
            s(2).replaceAll("\"", ""),
            s(3).replaceAll("\"", ""),
            s(5).replaceAll("\"", "").toInt
        )
).toDF()
bank.registerTempTable("bank")

and in this snippet below the error pops up:

%sql 
select age, count(1) value
from bank 
where age < 30 
group by age 
order by age

-> spark starts shuffling with the command group by

Does anyone know how to fix this?


Solution

  • At first, I don't have the code anymore. I left the company for which i worked on this, but we found the solution.

    This didn't worked out because we needed an Headless Service. It was hidden inside of the docs of spark kubernetes and no one recognized it. The error message poped up because the workers couldn't communicate with each other. It was the case because they don't have the adresses of each other and with the headless service they make a reverse lookup to find the IP's (or names) of all workers.

    Keep in mind that you have to name the headless service in a specific name (I don't know anymore what it was) so that the workers can find it.

    The headless service is not pre-configured in the manifest inside of the apache zeppelin interpreter-template file. So you have to add it seperately.

    This fixed my problem. I hope this can help someone