Search code examples
multithreadinggosqlx

Are sqlx.Next and sqlx.StructScan safe for concurrent use?


I have a large table in a MySQL database that I'm trying to read from as efficiently as I can. I was thinking about speeding the code up by adding multiple workers however when I do that I get marshaling errors at the start of running it (and only at the start) It looks something like this:

{"caller":"mysql.go:repository.(*MySQLRepo).GetNextBatch#428","error":"DBGetRecordException: could not marshal episode comments: sql: Scan error on column index 4, name "created_at": unsupported Scan, storing driver.Value type []uint8 into type *time.Time","level":"error","ts":"2020-07-13T20:42:03.9621 658Z"}

I don't get this is error if I remove the worker code from ImportLegacyComments and just loop over it normally. Is sqlx.next and sqlx.StructScan safe to multithread and if not is there an alternative way to do this safely?

import (
    _ "github.com/go-sql-driver/mysql"
    "github.com/jmoiron/sqlx"
)

type BatchResult struct {
    rows *sqlx.Rows
}

func (m *MySQLRepo) GetNextBatch(b *BatchResult) ([]model.EpisodeComment, error) {
    var episodeComments []model.EpisodeComment
    for i := 0; i < 1000 && b.rows.Next(); i++ {
        var episodeComment model.EpisodeComment
        err := b.rows.StructScan(&episodeComment)
        if err != nil {
            return nil, err
        }
        episodeComments = append(episodeComments, episodeComment)
    }
    return episodeComments, nil
}
func (m *MySQLRepo) FetchAllEpisodeComments() (*BatchResult, error) {
    rows, err := m.db.Queryx("SELECT * FROM episode_comment")
    if err != nil {
        return nil, err
    }
    return &BatchResult{
        rows: rows,
    }, nil
}

func (svc *ImportService) ImportLegacyComments(ctx context.Context) error {
    batchResult, err := svc.legacyCommentsRepo.FetchAllEpisodeComments()
    var wg sync.WaitGroup
    processor := func() {
        comments, err := svc.legacyCommentsRepo.GetNextBatch(batchResult)
        if err != nil {
            svc.logger.Error(err)
        }
        for len(comments) > 0 {
            comments, err = svc.legacyCommentsRepo.GetNextBatch(batchResult)
            if err != nil {
                svc.logger.Error(err)
            }
            svc.logger.Info("batch", "completed 1000")
        }
        wg.Done()
    }

    for i := 0; i < 20; i++ {
        go processor()
        wg.Add(1)
    }

    wg.Wait()

    return err
}

Solution

  • sqlx.Next and sqlx.StructScan are not safe for concurrent use.

    If you throw together a simple unit test for your code and run it with the race detector go test -race, it will report a race condition on an unexported field of the "database/sql".Rows struct:

    Write at 0x00c00000e080 by goroutine 22:
      github.com/lib/pq.(*rows).Next()
          /Users/blackgreen/go/pkg/mod/github.com/lib/[email protected]/conn.go:1464 +0x8ec
      ...
    
    Previous read at 0x00c00000e080 by goroutine 20:
      database/sql.(*Rows).Scan()
          /usr/local/go/src/database/sql/sql.go:3041 +0x2fa
      ...
    
    

    If we go find out what field makes the race detector complain, we can see that an indication against concurrent use is properly documented:

        // lastcols is only used in Scan, Next, and NextResultSet which are expected
        // not to be called concurrently.
        lastcols []driver.Value