I have 5gb of data serialized with apache thrift and a .thrift file with the formatting of the data. I have tried using thriftpy and the official thrift package but I can't wrap my head around how to open the files.
The data is the expanded dataset from http://www.iesl.cs.umass.edu/data/wiki-links
A description of the data format can be found here https://code.google.com/p/wiki-link/wiki/ExpandedDataset
The Scala setup is to be found in the ThriftSerializerFactory.scala file. Since the naming of most things is consistent throughout the Thrift libraries, you more or less model your python code after the Scala example:
package edu.umass.cs.iesl.wikilink.expanded.process
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.TIOStreamTransport
import java.io.File
import java.io.BufferedOutputStream
import java.io.FileOutputStream
import java.io.BufferedInputStream
import java.io.FileInputStream
import java.util.zip.{GZIPOutputStream, GZIPInputStream}
object ThriftSerializerFactory {
def getWriter(f: File) = {
val stream = new BufferedOutputStream(new GZIPOutputStream(new FileOutputStream(f)), 2048)
val protocol= new TBinaryProtocol(new TIOStreamTransport(stream))
(stream, protocol)
}
def getReader(f: File) = {
val stream = new BufferedInputStream(new GZIPInputStream(new FileInputStream(f)), 2048)
val protocol = new TBinaryProtocol(new TIOStreamTransport(stream))
(stream, protocol)
}
}
You basically set up a stream transport and the binary protocol. If you leave the data compressed, you will have to add the gzip piece to the puzzle, but once the data are decompressed this should not be needed anymore.
The code in WikiLinkItemIterator.scala shows how to read the data files using the factory class above.
class PerFileWebpageIterator(f: File) extends Iterator[WikiLinkItem] {
var done = false
val (stream, proto) = ThriftSerializerFactory.getReader(f)
private var _next: Option[WikiLinkItem] = getNext()
private def getNext(): Option[WikiLinkItem] = try {
Some(WikiLinkItem.decode(proto))
} catch {case _: TTransportException => {done = true; stream.close(); None}}
def hasNext(): Boolean = !done && (_next != None || {_next = getNext(); _next != None})
def next(): WikiLinkItem = if (hasNext()) _next match {
case Some(wli) => {_next = None; wli}
case None => {throw new Exception("Next on empty iterator.")}
} else throw new Exception("Next on empty iterator.")
}
Steps to implement:
WikiLinkItem
instance.read(proto)
to read one record of data