how i can input dataset - type as input to dataproc jobs ?
mine code is below
%%writefile spark_job.py
import sys
import pyspark
import argparse
import pickle
#def time_configs_rdd(test_set, batch_sizes,batch_numbers,repetitions):
def time_configs_rdd(argv):
print(argv)
parser = argparse.ArgumentParser() # get a parser object
parser.add_argument('--out_bucket', metavar='out_bucket', required=True,
help='The bucket URL for the result.') # add a required argument
parser.add_argument('--out_file', metavar='out_file', required=True,
help='The filename for the result.') # add a required argument
parser.add_argument('--batch_size', metavar='batch_size', required=True,
help='The bucket URL for the result.') # add a required argument
parser.add_argument('--batch_number', metavar='batch_number', required=True,
help='The filename for the result.') # add a required argument
parser.add_argument('--repetitions', metavar='repetitions', required=True,
help='The filename for the result.') # add a required argument
parser.add_argument('--test_set', metavar='test_set', required=True,
help='The filename for the result.') # add a required argument
args = parser.parse_args(argv) # read the value
# the value provided with --out_bucket is now in args.out_bucket
time_configs_results = []
for s in args.batch_size:
for n in args.batch_number:
dataset = **args.test_set.batch(s).take(n)**
for r in args.repetitions:
tt0 = time.time()
for i in enumerate(dataset):
totaltime = str(time.time()-tt0)
batchtime = totaltime
#imgpersec = s*n/totaltime
time_configs_results.append((s,n,r,float(batchtime)))
#time_configs_results.append((s,n,r,batchtime,imgpersec))
time_configs_results_rdd = sc.parallelize(time_configs_results) #create an RDD with all results for each parameter
time_configs_results_rdd_avg = time_configs_results_rdd.map(lambda x: (x, x[0]*x[1]/x[3])) #RDD with the average reading speeds (RDD.map)
#mapping = time_configs_results_rdd_avg.collect()
#print(mapping)
return (time_configs_results_rdd_avg)
if 'google.colab' not in sys.modules: # Don't use system arguments when run in Colab
time_configs_rdd(sys.argv[1:])
elif __name__ == "__main__" : # but define them manually
time_configs_rdd(["--out_bucket", BUCKET, "--out_file", "time_configs_rdd_out.pkl","--batch_size", batch_size, "--batch_number", batch_number,"--test_set", test_set ] )
and code to execute it
FILENAME = 'file_RDD_OUT.pkl'
batch_size = [1]
batch_number = [1]
repetitions = [1]
#test_set = 1 will give string error
test_set = dataset2 # file <ParallelMapDataset shapes: ((192, 192, None), ()), types: (tf.float32,
tf.string)> cannot be inserted
!gcloud dataproc jobs submit pyspark --cluster $CLUSTER --region $REGION \
./spark_job.py \
-- --out_bucket $BUCKET --out_file $FILENAME --batch_size $batch_size --batch_number $batch_number --repetitions $repetitions --test_set $test_set
unfortunetlly is keep failing with error
AttributeError: 'str' object has no attribute 'batch' ERROR: (gcloud.dataproc.jobs.submit.pyspark) Job [c2048c422f334b08a628af5a1aa492eb] failed with error: Job failed with message [AttributeError: 'str' object has no attribute 'batch'].
problem is with test_set how i should convert dataset2(ParallelMapDataset) to be readed by the job
So you are trying to parse a string from the command line argument to a ParallelMapDataset type. You want to use the type
param in your add_argument
calls.
From https://docs.python.org/3/library/argparse.html#type and I quote:
By default, ArgumentParser objects read command-line arguments in as simple strings. However, quite often the command-line string should instead be interpreted as another type, like a float or int. The type keyword argument of add_argument() allows any necessary type-checking and type conversions to be performed.
and
type= can take any callable that takes a single string argument and returns the converted value
So you probably want something like:
def parse_parallel_map_dataset(string):
# your logic to parse the string into your desired data structure
...
parser.add_argument('--test_set', metavar='test_set', required=True,
type=parse_parallel_map_dataset)
Or better yet, read your test_set from a file and pass the file name as an argument.