I'm trying to query HealthKit for heart rate values and steps in the time interval defined by a HKWorkoutEvent
to fill a custom local model I have defined to store multiple variables, it's defined below.
struct SGWorkoutEvent: Identifiable {
let id = UUID()
let type: HKWorkoutEventType
let splitActiveDurationQuantity: HKQuantity?
let splitDistanceQuantity: HKQuantity?
let totalDistanceQuantity: HKQuantity?
let splitMeasuringSystem: HKUnit
let steps: HKQuantity?
let heartRate: HKQuantity?
}
Al properties except steps
and heartRate
can be extracted from a HKWorkoutEvent
. However, I am trying to build a Combine pipeline that would let me create an array of publishers to query in parallel for heart rate, steps and also pass the workout event so in the sink
I receive a 3-element tuple with these values so I can populate the model above. What I currently have is below,
// Extract the workout's segments (defined automatically by an Apple Watch)
let workoutSegments = (workout.workoutEvents ?? []).filter({ $0.type == .segment })
// For each of the workout segments defined above create a HKStatisticQuery that starts on the interval's
// beginning and ends on the interval's end so the HealthKit query is properly defined to be
// executed between that interval.
let segmentsWorkoutPublisher = Publishers.MergeMany(workoutSegments.map({ $0.dateInterval }).map({
healthStore.statistic(for: HKQuantityType.quantityType(forIdentifier: HKQuantityTypeIdentifier.heartRate)!, with: .discreteAverage, from: $0.start, to: $0.end)
}))
.assertNoFailure()
// Do the same logic as above in `segmentsWorkoutPublisher` but for steps
let stepsPublisher = Publishers.MergeMany(workoutSegments.map({ $0.dateInterval }).map({
healthStore.statistic(for: HKObjectType.quantityType(forIdentifier: HKQuantityTypeIdentifier.stepCount)!, with: .cumulativeSum, from: $0.start, to: $0.end)
}))
.assertNoFailure()
Publishers.Zip3(workoutSegments.publisher, stepsPublisher, segmentsWorkoutPublisher)
.receive(on: DispatchQueue.main)
.sink(receiveValue: { pace, steps, hrs in
let d = SGWorkoutEvent(type: pace.type,
splitActiveDurationQuantity: pace.splitDuration,
splitDistanceQuantity: pace.splitDistance,
totalDistanceQuantity: pace.totalDistanceQuantity,
splitMeasuringSystem: pace.splitMeasuringSystem,
steps: steps.sumQuantity(),
heartRate: hrs.averageQuantity())
self.paces.append(d)
})
.store(in: &bag)
HKHealthStore.statistic(for:...)
is nothing but a Combine wrapper for HKStatisticsQuery
defined on a HKHealthStore
extension, see below.
public func statistic(for type: HKQuantityType, with options: HKStatisticsOptions, from startDate: Date, to endDate: Date, _ limit: Int = HKObjectQueryNoLimit) -> AnyPublisher<HKStatistics, Error> {
let subject = PassthroughSubject<HKStatistics, Error>()
let predicate = HKStatisticsQuery.predicateForSamples(withStart: startDate, end: endDate, options: [.strictEndDate, .strictStartDate])
let query = HKStatisticsQuery(quantityType: type, quantitySamplePredicate: predicate, options: options, completionHandler: { (query, statistics, error) in
guard error == nil else {
hkCombineLogger.error("Error fetching statistics \(error!.localizedDescription)")
return
}
subject.send(statistics!)
subject.send(completion: .finished)
})
self.execute(query)
return subject.eraseToAnyPublisher()
}
What I am seeing here is some sort of race condition where both steps and heart rate retrieved is not returning ad the same time. As a result I see values that don't make sense like on one 1K split of 5' 200steps and another one of the same duration 700steps. The real case should be that those two intervals should show a value around 150 but it seems that I am probably not using the correct Combine operator.
The expected behavior I would hope to see is for every publisher on Publishers.Zip
to have each 3-item tuple have finished its query in order (1st interval, 2nd interval...) rather than this non-replicable race condition.
To try and give more context I think this is akin to having a model with temperature, humidity and chance of rain for different timestamps and querying three different API endpoints to retrieve the three different values and merge them in the model.
There is a lot to unpack here, but I'll give it a shot. Let's start with your HKHealthStore.statistic
function. You want to run an (presumably asynchronous) query, publish a sequence with a single result, that then ends. This really seems like an ideal case to use a Future
. I don't have any experience (at all) with HealthKit
and I can't guarantee this will compile, but a transformation might look something like this:
public func statistic(
for type: HKQuantityType,
with options: HKStatisticsOptions,
from startDate: Date,
to endDate: Date,
_ limit: Int = HKObjectQueryNoLimit) -> AnyPublisher<HKStatistics, Error> {
let future = Future<HKStatistics, Error> {
fulfillPromise in
let predicate = HKStatisticsQuery.predicateForSamples(withStart: startDate, end: endDate, options: [.strictEndDate, .strictStartDate])
let query = HKStatisticsQuery(quantityType: type, quantitySamplePredicate: predicate, options: options, completionHandler: { (query, statistics, error) in
guard error == nil else {
hkCombineLogger.error("Error fetching statistics \(error!.localizedDescription)")
fulfillPromise(.failure(error!))
}
fulfillPromise(.success(statistics!))
})
self.execute(query)
}
return future.eraseToAnyPublisher()
}
So now we have a "one shot" publisher that runs a query and fires when it has a value.
Now let's look at your segmentsWorkoutPublisher
(and by extension stepsPublisher
).
In working with Combine, you should be VERY wary if you find yourself using the Publisher.<SomeOperatorType>
constructors. In my experience it's rarely right thing to do. (Having said that, using Zip3
later seems OK to me).
In this case you are creating a sequence of Publishers
(your Futures
). But you're really not interested in a sequence of Publishers
. You're interested in a sequence of the values those Publishers
produce. In a sense you want to "unwrap" each Publisher (by waiting for its value) and send those results down the sequence. This is exactly what flatMap
is for! Let's do something like this:
let segmentsWorkoutPublisher =
workoutSegments
.map { $0.dateInterval }
.flatMap {
healthStore.statistic(for: HKQuantityType.quantityType(forIdentifier: HKQuantityTypeIdentifier.heartRate)!, with: .discreteAverage, from: $0.start, to: $0.end)
}
.assertNoFailure()
This generates a string of sequences, but then waits for each one to emit a value and sends the values farther along.
And stepsPublisher
would change in a similar way.
I think that will get you where you need to go. As part of looking at this, created a Playground where I reworked your example but with more simplified types. Next time you run into a problem like this you might try something similar - filter away the superfluous details and try to create a simpler example. If you can put your code together like this in a playground, that will compile without too much trouble, it will make it easier to answer questions. The Playground:
import Foundation
import Combine
import PlaygroundSupport
enum MockEventType : CaseIterable {
case segment
case notSegment
}
struct MockSegment {
let type : MockEventType
let dateInterval : DateInterval = DateInterval.init(start: Date.now, duration: 3600)
}
func statistic() -> AnyPublisher<Float, Never> {
let future = Future<Float, Never>() {
fulfillPromise in
DispatchQueue.global(qos: .background).async {
sleep(UInt32.random(in: 1...3))
fulfillPromise(.success(Float.random(in: 100.0...150.0)))
}
}
return future
.eraseToAnyPublisher()
}
// Generate an endless stream of mock events.
let rawWorkouts = Timer.publish(every: 1.0, on: .current, in: .common)
.autoconnect()
.map{ _ in MockSegment(type: MockEventType.allCases.randomElement()!) }
let workoutSegments = rawWorkouts.filter { $0.type == .segment }
let dateIntervals =
workoutSegments
.map { $0.dateInterval }
let segmentsWorkoutPublisher =
dateIntervals
.flatMap { _ in statistic() }
.assertNoFailure()
let stepsPublisher =
dateIntervals
.flatMap { _ in statistic() }
.assertNoFailure()
var bag = Set<AnyCancellable>()
Publishers.Zip3(workoutSegments, stepsPublisher, segmentsWorkoutPublisher)
.receive(on: DispatchQueue.main)
.sink(receiveValue: { pace, steps, hrs in
print("pace: \(pace) steps: \(steps), hrs: \(hrs)")
})
.store(in: &bag)
PlaygroundSupport.PlaygroundPage.current.needsIndefiniteExecution = true