Search code examples
arraysdictionaryjuliahdf5

Merging HDF5 files for faster data reading using Julia


I've 600 hdf5 files(totaling over 720GBs) from a large N-body simulation and I need to analyze them. Currently, I'm using python to read them sequentially, however, this takes a lot of time (~60hrs). The issue is that these are small 1.2GB files and a lot of time is gone just to open and close the file. Here is the python version of the code that I'm using:

import numpy as np
import h5py as hp5

#path to files
path = "TNG300-1/FullPhysics/Snapshots/snap_099."
dark_path = "TNG300-1/Dark/Snapshots/snap_099."

# number of snapshots
n_snaps = 600
n_dark_snaps = 75


# ====================================================================================
# defining arrays for saving data
data = np.array([[0,0,0]],dtype=np.float32)
g_data = np.array([[0,0,0]],dtype=np.float32)
g_mass = np.array([0],dtype=np.float32)
S_data = np.array([[0,0,0]], dtype=np.float32)
S_mass = np.array([0], dtype=np.float32)
BH_data = np.array([[0,0,0]], dtype=np.float32)
BH_mass = np.array([0], dtype=np.float32)

OSError_count = 0
OSError_idx = []

#reading data from TNG300-1 simulation
for i in tqdm(range(0,n_snaps), "Reading file:"):
    try:
        fileName = path + str(i) + ".hdf5"
        f = hp5.File(fileName, 'r')
        # reading DM data
        data = np.concatenate((data, f['PartType1/Coordinates'][:]), axis=0)
        # reading Gas data
        g_data = np.concatenate((g_data, f['PartType0/Coordinates'][:]), axis=0)
        g_mass = np.concatenate((g_mass, f['PartType0/Masses'][:]), axis=0)
        # reading Star data
        S_data = np.concatenate((S_data, f['PartType4/Coordinates'][:]), axis=0)
        S_mass = np.concatenate((S_mass, f['PartType4/Masses'][:]), axis=0)
        # Reading Blackhole's data
        BH_data = np.concatenate((BH_data, f['PartType5/Coordinates'][:]), axis=0)
        BH_mass = np.concatenate((BH_mass, f['PartType5/BH_Mass'][:]), axis=0)
        f.close()
    except OSError:
        OSError_count += 1
        OSError_idx.append(i)

Julia is faster than python at reading large data, so I want to merge the files first together and then read them. The HDF5 files are structured like this: HDF5 file structure

I need to merger the Parts: PartType0,1,4,5. As a test run, I run I loaded 2 files and tried to merge them:

using HDF5
f1 = h5read("snap_099.0.hdf5", "/PartType0")
f2 = h5read("snap_099.1.hdf5", "/PartType0")

f = cat(f1,f2, dims=1)

Instead of saving it as dictionary with keywords "Masses", "Coordinates" which contains data form both the files, it stores it as an array whose elements are dictionaries. I tried changing the axis to dims=2 but still no luck. Here is the output:

julia> f1 = h5read("snap_099.0.hdf5", "/PartType0")
Dict{String,Any} with 2 entries:
  "Masses"      => Float32[0.000978657, 0.000807373, 0.00110127, 0.00115997, 0.000843322, 0.000844374, 0.000696105, 0.0…
  "Coordinates" => [43711.3 43726.1 … 45550.0 45586.2; 48812.0 48803.9 … 51899.6 51856.9; 1.47607e5 1.47608e5 … 1.46392…

julia> f2 = h5read("snap_099.1.hdf5", "/PartType0")
Dict{String,Any} with 2 entries:
  "Masses"      => Float32[0.00110192, 0.000820541, 0.000800644, 0.000731853, 0.000921848, 0.000809955, 0.000903888, 0.…
  "Coordinates" => [45416.0 45579.5 … 1.19211e5 1.19285e5; 51893.7 51858.7 … 67786.6 67815.1; 1.46519e5 1.46366e5 … 1.9…

julia> f = cat(f1,f2, dims=1)
2-element Array{Dict{String,Any},1}:
 Dict("Masses" => Float32[0.0009786569, 0.0008073727, 0.0011012702, 0.0011599729, 0.0008433224, 0.00084437383, 0.0006961052, 0.00070475566, 0.0010829173, 0.00094494154  …  0.0007893325, 0.0007540708, 0.001073747, 0.0008177613, 0.0010638952, 0.00086337695, 0.0010262426, 0.00088746834, 0.0007905843, 0.0008635204],"Coordinates" => [43711.32625977148 43726.11234425759 … 45550.032234873746 45586.24493944876; 48811.97000746165 48803.934063888715 … 51899.56371629323 51856.93800620881; 147607.34346897554 147607.51945277958 … 146392.25935419425 146412.14055034568])
 Dict("Masses" => Float32[0.0011019199, 0.00082054106, 0.0008006442, 0.0007318534, 0.0009218479, 0.00080995524, 0.00090388797, 0.0009912133, 0.0009779222, 0.0009963409  …  0.0007508516, 0.0009412733, 0.0009498103, 0.0007702337, 0.0009541717, 0.0004982273, 0.00090054696, 0.0007944233, 0.00084039115, 0.00075964356],"Coordinates" => [45415.96489211404 45579.45274482072 … 119210.6050025773 119284.70906576645; 51893.668288786364 51858.695608083464 … 67786.58517835569 67815.08230612907; 146519.06862554888 146365.8540504153 … 196001.12752015938 196055.86525250477])

Is there a way so that I can merge the dictionaries. Also can I do it for nested dictionaries, because if I read the HDF5 file's data as f = h5read("snap_099.0.hdf5", "/") it created an array with nested dictionaries.

julia> f2 = h5read("snap_099.1.hdf5", "/")
Dict{String,Any} with 5 entries:
  "Header"    => Dict{String,Any}()
  "PartType1" => Dict{String,Any}("Coordinates"=>[43935.2 44328.7 … 82927.2 82079.8; 50652.3 51090.2 … 1.20818e5 1.21497e5; 1.46135e5 1.47718e5 … 1.94209e5 …
  "PartType5" => Dict{String,Any}("BH_Mass"=>Float32[8.0f-5, 8.0f-5, 8.0f-5, 8.0f-5, 8.0f-5, 8.0f-5, 8.0f-5, 8.0f-5, 8.0f-5, 8.0f-5  …  8.0f-5, 8.0f-5, 8.0f…  
  "PartType0" => Dict{String,Any}("Masses"=>Float32[0.00110192, 0.000820541, 0.000800644, 0.000731853, 0.000921848, 0.000809955, 0.000903888, 0.000991213, 0…
  "PartType4" => Dict{String,Any}("Masses"=>Float32[0.00036615, 0.000584562, 0.000511133, 0.000749962, 0.000413476, 0.0005036, 0.000596368, 0.000589417, 0.0…

I want the data of those dictionaries combined, i.e at the end I want an array f with the dictionaries structured like above, however with all the data combined, for example, if I access f["PartType0"]["Coordinates"] I should get the coordinates from all the HDF5 files.

Thank You.

===Edit===
I tried the answer by @Przemyslaw Szufel, and here is the problem that I'm facing:
I tried both methods, but they didn't work. With the data frame approach, it says:
ERROR: ArgumentError: adding AbstractArray other than AbstractVector as a column of a data frame is not allowed. I think this is because of the structure of the dictionary which is Dict{String,Any} with 2 entries:.

With the append approach as dictionary, I'm getting a 4 element array:

julia> f1 = h5read("snap_099.0.hdf5", "/PartType0")
Dict{String,Any} with 2 entries:
  "Masses"      => Float32[0.000978657, 0.000807373, 0.00110127, 0.00115997, 0.000843322, 0.000844374, 0.000696105, 0.000704756, 0.00108292, 0.000944942  … …
  "Coordinates" => [43711.3 43726.1 … 45550.0 45586.2; 48812.0 48803.9 … 51899.6 51856.9; 1.47607e5 1.47608e5 … 1.46392e5 1.46412e5]

julia> f2 = h5read("snap_099.1.hdf5", "/PartType0")
Dict{String,Any} with 2 entries:
  "Masses"      => Float32[0.00110192, 0.000820541, 0.000800644, 0.000731853, 0.000921848, 0.000809955, 0.000903888, 0.000991213, 0.000977922, 0.000996341  …
  "Coordinates" => [45416.0 45579.5 … 1.19211e5 1.19285e5; 51893.7 51858.7 … 67786.6 67815.1; 1.46519e5 1.46366e5 … 1.96001e5 1.96056e5]

julia> append!.(getindex.(Ref(f1), keys(f1)), getindex.(Ref(f2), keys(f1)))
ERROR: MethodError: no method matching append!(::Array{Float64,2}, ::Array{Float64,2})
Closest candidates are:
  append!(::BitArray{1}, ::Any) at bitarray.jl:771
  append!(::AbstractArray{T,1} where T, ::Any) at array.jl:981
Stacktrace:
 [1] _broadcast_getindex_evalf at ./broadcast.jl:648 [inlined]
 [2] _broadcast_getindex at ./broadcast.jl:621 [inlined]
 [3] getindex at ./broadcast.jl:575 [inlined]
 [4] copyto_nonleaf!(::Array{Array{Float32,1},1}, ::Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1},Tuple{Base.OneTo{Int64}},typeof(append!),Tuple{Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1},Nothing,typeof(getindex),Tuple{Base.RefValue{Dict{String,Any}},Base.Broadcast.Extruded{Array{String,1},Tuple{Bool},Tuple{Int64}}}},Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1},Nothing,typeof(getindex),Tuple{Base.RefValue{Dict{String,Any}},Base.Broadcast.Extruded{Array{String,1},Tuple{Bool},Tuple{Int64}}}}}}, ::Base.OneTo{Int64}, ::Int64, ::Int64) at ./broadcast.jl:1026
 [5] copy(::Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1},Tuple{Base.OneTo{Int64}},typeof(append!),Tuple{Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1},Nothing,typeof(getindex),Tuple{Base.RefValue{Dict{String,Any}},Array{String,1}}},Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1},Nothing,typeof(getindex),Tuple{Base.RefValue{Dict{String,Any}},Array{String,1}}}}}) at ./broadcast.jl:880
 [6] materialize(::Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1},Nothing,typeof(append!),Tuple{Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1},Nothing,typeof(getindex),Tuple{Base.RefValue{Dict{String,Any}},Array{String,1}}},Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1},Nothing,typeof(getindex),Tuple{Base.RefValue{Dict{String,Any}},Array{String,1}}}}}) at ./broadcast.jl:837
 [7] top-level scope at REPL[5]:1

Moreover, the elements are also getting modified, which is quite strange.


Solution

  • Code below addresses the Python performance issue described in the original question. It shows how to get the shape of each dataset and use it to pre-allocate the NumPy arrays for the combined data arrays of all 600 files. There are some assumptions in this procedure:

    1. Every dataset with the same name has the same shape. In other words, if f['PartType1/Coordinates'] has shape (100,3) in the first file, it is assumed to have shape (100,3) in the other 599 files.
    2. Each array is allocated as n_snaps*shape[0]
    3. If datasets have different shapes, this will have to be modified get and to sum all shapes, then use that value for allocation.
    4. In the same way, if all 'Coordinates' datasets have the same shapes, the code can be simplified (and the same is true for all 'Masses' datasets).

    There is a lot of repeated code. With a little work, I think this should be done in a function, passing in file counter, dataset name and array, then returning the updated array.

    Code below. I suggest testing with n_snaps=10 before running with all 600.

    # Start by getting dataset shapes, and
    # using to allocate arrays for all datasets:               
    with hp5.File(f"{path}{0}.hdf5", 'r') as f:    
        # preallocate DM data arrays
        a0 = f['PartType1/Coordinates'].shape[0]
        data = np.empty((n_snaps*a0,3),dtype=np.float32)
        # preallocate Gas data arrays
        a0 = f['PartType0/Coordinates'].shape[0]
        g_data = np.empty((n_snaps*a0,3),dtype=np.float32)
        a0 = f['PartType0/Masses'].shape[0]
        g_mass  = np.empty((n_snaps*a0,),dtype=np.float32)    
        # preallocate Star data arrays
        a0 = f['PartType4/Coordinates'].shape[0]
        S_data = np.empty((n_snaps*a0,3),dtype=np.float32)
        a0 = f['PartType4/Masses'].shape[0]
        S_mass  = np.empty((n_snaps*a0,),dtype=np.float32)       
        a0 = f['PartType5/Coordinates'].shape[0]
        # preallocate Blackhole data arrays
        BH_data = np.empty((n_snaps*a0,3),dtype=np.float32)
        a0 = f['PartType5/Masses'].shape[0]
        BH_mass = np.empty((n_snaps*a0,),dtype=np.float32)  
        
    #read data from each TNG300-1 simulation and load to arrays
    OSError_count = 0
    OSError_idx = []
    for i in tqdm(range(0,n_snaps), "Reading file:"):
        try:
            fileName = f"{path}{i}.hdf5"
            with hp5.File(fileName, 'r') as f:
                # get DM data
                a0 = f['PartType1/Coordinates'].shape[0]
                data[a0*i:a0*(i+1) ] = f['PartType1/Coordinates']            
                # get Gas data
                a0 = f['PartType0/Coordinates'].shape[0]
                g_data[a0*i:a0*(i+1) ] = f['PartType0/Coordinates']            
                a0 = f['PartType0/Masses'].shape[0]
                g_mass[a0*i:a0*(i+1) ] = f['PartType0/Masses']            
                # get Star data
                a0 = f['PartType4/Coordinates'].shape[0]
                S_data[a0*i:a0*(i+1) ] = f['PartType4/Coordinates']            
                a0 = f['PartType4/Masses'].shape[0]
                S_mass[a0*i:a0*(i+1) ] = f['PartType4/Masses']            
                # get Blackhole data
                a0 = f['PartType5/Coordinates'].shape[0]
                BH_data[a0*i:a0*(i+1) ] = f['PartType5/Coordinates']            
                a0 = f['PartType5/Masses'].shape[0]
                BH_mass[a0*i:a0*(i+1) ] = f['PartType5/Masses']            
                    
        except OSError:
            OSError_count += 1
            OSError_idx.append(i)