My application processes very large volume of real-time data (>200 million a day) and I need to aggregate them real time in order keep reporting performant. The data is fed and hence processed randomly by several threads by the server.
I use MariaDB 10.5.6-MariaDB
with InnoDB 10.5.6
Do you know if the trigger is thread-safe, i.e. if data race can happen at all. In other words when 1000 updates - increment only - happens to the same columns in a single row within a second by 10 connections, then the data will not be messed up and the result will be like the values were summed by a single connection in series.
Do you know how row-level locking works and if it is automatic or if it can be enforced manually.
Sharing some of your relevant bookmarks would also be appreciated because I did not find anything succinct and useful in google.
UPDATE
I have added an after-insert trigger that created a new record in the reporting table if the record did not exist and then updated the columns with an update statement update table set field=value+delta where condition
.
The database did not like it and the application - java, hibernate -, that was sending data could not bear it either and started to throw exceptions:
Caused by: org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)
Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction
What I found is the trigger was not thread safe in a sense that the database started throwing different errors for concurrent update to the same row:
I tried to do introduce row level locking but it did not work at all. I believe the lock was ignored or the rows did not get locked at all
$ grep "ExecStart=" /usr/lib/systemd/system/mariadb.service
ExecStart=/usr/sbin/mariadbd --autocommit=0 --transaction-isolation=read-committed $MYSQLD_OPTS $_WSREP_NEW_CLUSTER $_WSREP_START_POSITION
SELECT what FROM tables WHERE conditions FOR UPDATE
using primary keysTried a table level locking equivalent solution as well with persisting the data with a single thread but it could not cope with the data volume I had.
The solution that I went for is the thread-level segregation of feed processing from persistence in a way that several threads processes the incoming data feed and creates entity objects for another set of threads to save them in the database. This enables me to experiment and to find the optimal number of threads per area for my platform, like currently, I am testing with 8 threads processing the incoming feed and creating entity objects for another 4 threads that are responsible for saving them in the database. For the persister threads, I have introduced some smart segregation and custom locking of set of entities in the application layer to ensure that no two threads tries to update the same row at the same time. This seems to work, I now just need to find the right number of threads for both area.
This is the consumer class that produces the backlog for DB writers
protected abstract Map<String, Set<ENTITY>> breakDownToBatchesForPersistance(Collection<ENTITY> localBacklog);
private void saveEntitiesInBatches(IDefaultEntityDAO<ENTITY> dao, Collection<ENTITY> localBacklog) {
for (Map.Entry<String, Set<ENTITY>> entry : breakDownToBatchesForPersistance(localBacklog).entrySet()) {
persister.saveAll(dao, entry.getKey(), entry.getValue());
}
}
This is the backlog for the DB writers
private LinkedBlockingQueue<Key> keys;
private Map<Key, Set> backlog;
public <ENTITY> void saveAll(IDefaultEntityDAO<ENTITY> dao, String bucket, Set<ENTITY> entitySet) {
Key<ENTITY> key = Key.get(dao, bucket);
synchronized (key) {
synchronized (backlog) {
if (backlog.containsKey(key)) {
backlog.get(key).addAll(entitySet);
} else {
backlog.put(key, entitySet);
try {
keys.put(key);
} catch (InterruptedException ex) {
}
}
}
}
}
This is the core of the DB writer
private void processDBBatchUpdate(Key key) {
synchronized (key) {
Set set;
synchronized (backlog) {
set = backlog.remove(key);
}
key.getDao().saveAll(set);
}
}
This is the Key class for locking
private IDefaultEntityDAO<ENTITY> dao;
private String bucket;
private static Map<IDefaultEntityDAO, Map<Object, Key>> keys = new HashMap<>();
private Key(IDefaultEntityDAO dao, String bucket) {
this.dao = dao;
this.bucket = bucket;
}
public static synchronized <ENTITY> Key<ENTITY> get(IDefaultEntityDAO<ENTITY> dao, String bucket) {
if (!keys.containsKey(dao)) {
keys.put(dao, new HashMap<>());
}
if (!keys.get(dao).containsKey(bucket)) {
keys.get(dao).put(bucket, new Key(dao, bucket));
}
return keys.get(dao).get(bucket);
}