Search code examples
chroniclechronicle-queuechronicle-wire

Chronicle Queue JSON


I'm currently trying to marshal a certain POJO as JSON and write it to an output queue. This is the run down of what I'm trying to do:

object OrderExecutor {

    const val OUT_PATH = "${PATH}/order-executor/out"

    @JvmStatic
    fun main(args: Array<String>) {
        ChronicleQueue.single("${PATH}/out").use { input ->
            SingleChronicleQueueBuilder.builder(Path(OUT_PATH), WireType.JSON).build().use { output->
                val executor = object: AbstractProcessor(){
                    val consumer = output.acquireAppender().methodWriter(ExchangeOrderConsumer::class.java)
                    val executor = OrderExecutor(consumer)
                    override val inputReader: MethodReader = input.createTailer().methodReader(executor)
                }
                executor.run()
            }
        }
    }
}

but am getting this error when appending to the file:

Caused by: java.lang.UnsupportedOperationException: todo
at net.openhft.chronicle.wire.TextWire$TextValueOut.int128forBinding(TextWire.java:1476)
at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueStore.intForBinding(SingleChronicleQueueStore.java:126)
at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueStore.writeMarshallable(SingleChronicleQueueStore.java:302)
at net.openhft.chronicle.wire.TextWire$TextValueOut.marshallable(TextWire.java:1907)
at net.openhft.chronicle.wire.ValueOut.typedMarshallable(ValueOut.java:440)
at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder.createStore(SingleChronicleQueueBuilder.java:265)
at net.openhft.chronicle.queue.impl.single.SingleChronicleQueue$StoreSupplier.acquire(SingleChronicleQueue.java:966)
at net.openhft.chronicle.queue.impl.WireStorePool.acquire(WireStorePool.java:53)
at net.openhft.chronicle.queue.impl.single.StoreAppender.setCycle2(StoreAppender.java:269)
at net.openhft.chronicle.queue.impl.single.StoreAppender.setWireIfNull(StoreAppender.java:432)
at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:389)
at net.openhft.chronicle.queue.impl.single.StoreAppender.acquireWritingDocument(StoreAppender.java:412)
at me.oms.order.ExchangeOrderConsumerJsonMethodWriter.newExchangeOrderSingle(ExchangeOrderConsumerJsonMethodWriter.java:49)
at me.oms.order.process.OrderExecutor.created(OrderExecutor.kt:23)
at me.oms.order.process.OrderExecutorMethodReader.readOneCall(OrderExecutorMethodReader.java:64)

I then noticed in the docs that text based formats including JSON are not supported. What is the most elegant way to achieve this? Do I have to use a lower level API?


Solution

  • The WireType here is the format used by the Queue itself, and the queue uses features JSON/TextWire doesn't support such as thread-safe 128-bit values.

    There isn't an elegant way of writing JSON into the queue, however, you don't need to. You can convert it to JSON when you read it instead should you need, however in this example you don't as you are also using a method reader.