Search code examples
pythonpycharmtradingziplinequantopian

Is there a way to create a pipeline locally using zipline?


I have set up zipline locally on PyCharm. The simulations work, moreover, I have access to premium data from quandl (which automatically updated when I entered my API key). However, now my question is, how do I make a pipeline locally using zipline.


Solution

  • Zipline's documentation is challenging. Zipline.io (as of 2021-0405) is also down. Fortunately, Blueshift has documentation and sample code that shows how to make a pipeline that can be run locally:

    • Blueshift sample pipeline code is here. (Pipelines library here.)
    • Zipline documentation can be accessed from MLTrading (archive documentation here) since though challenging it is still useful.
    • Full code of the pipeline sample code from Blueshift, but modified to run locally through PyCharm, is below the line. Please note as I'm sure you're already aware, the strategy is a bad strategy and you shouldn't trade on it. It does show local instantiations of pipelines though.

    """
        Title: Classic (Pedersen) time-series momentum (equal weights)
        Description: This strategy uses past returns and go long (short)
                    the positive (negative) n-percentile
        Style tags: Momentum
        Asset class: Equities, Futures, ETFs, Currencies
        Dataset: All
    """
    
    """
    Sources:
    Overall Algorithm here:
    https://github.com/QuantInsti/blueshift-demo-strategies/blob/master/factors/time_series_momentum.py
    
    Custom (Ave Vol Filter, Period Returns) Functions Here: 
    https://github.com/QuantInsti/blueshift-demo-strategies/blob/master/library/pipelines/pipelines.py
    """
    
    import numpy as np
    
    from zipline.pipeline import CustomFilter, CustomFactor, Pipeline
    from zipline.pipeline.data import EquityPricing
    from zipline.api import (
        order_target_percent,
        schedule_function,
        date_rules,
        time_rules,
        attach_pipeline,
        pipeline_output,
    )
    
    
    def average_volume_filter(lookback, amount):
        """
           Returns a custom filter object for volume-based filtering.
    
           Args:
               lookback (int): lookback window size
               amount (int): amount to filter (high-pass)
    
           Returns:
               A custom filter object
    
           Examples::
    
               # from library.pipelines.pipelines import average_volume_filter
    
               pipe = Pipeline()
               volume_filter = average_volume_filter(200, 1000000)
               pipe.set_screen(volume_filter)
        """
    
        class AvgDailyDollarVolumeTraded(CustomFilter):
            inputs = [EquityPricing.close, EquityPricing.volume]
    
            def compute(self, today, assets, out, close_price, volume):
                dollar_volume = np.mean(close_price * volume, axis=0)
                high_volume = dollar_volume > amount
                out[:] = high_volume
    
        return AvgDailyDollarVolumeTraded(window_length=lookback)
    
    
    def period_returns(lookback):
        """
           Returns a custom factor object for computing simple returns over
           period.
    
           Args:
               lookback (int): lookback window size
    
           Returns:
               A custom factor object.
    
           Examples::
    
               # from library.pipelines.pipelines import period_returns
               pipe = Pipeline()
               momentum = period_returns(200)
               pipe.add(momentum,'momentum')
        """
    
        class SignalPeriodReturns(CustomFactor):
            inputs = [EquityPricing.close]
    
            def compute(self, today, assets, out, close_price):
                start_price = close_price[0]
                end_price = close_price[-1]
                returns = end_price / start_price - 1
                out[:] = returns
    
        return SignalPeriodReturns(window_length=lookback)
    
    
    def initialize(context):
        '''
            A function to define things to do at the start of the strategy
        '''
        # The context variables can be accessed by other methods
        context.params = {'lookback': 12,
                          'percentile': 0.1,
                          'min_volume': 1E7
                          }
    
        # Call rebalance function on the first trading day of each month
        schedule_function(strategy, date_rules.month_start(),
                          time_rules.market_close(minutes=1))
    
        # Set up the pipe-lines for strategies
        attach_pipeline(make_strategy_pipeline(context),
                        name='strategy_pipeline')
    
    
    def strategy(context, data):
        generate_signals(context, data)
        rebalance(context, data)
    
    
    def make_strategy_pipeline(context):
        pipe = Pipeline()
    
        # get the strategy parameters
        lookback = context.params['lookback'] * 21
        v = context.params['min_volume']
    
        # Set the volume filter
        volume_filter = average_volume_filter(lookback, v)
    
        # compute past returns
        momentum = period_returns(lookback)
        pipe.add(momentum, 'momentum')
        pipe.set_screen(volume_filter)
    
        return pipe
    
    
    def generate_signals(context, data):
        try:
            pipeline_results = pipeline_output('strategy_pipeline')
        except:
            context.long_securities = []
            context.short_securities = []
            return
    
        p = context.params['percentile']
        momentum = pipeline_results
    
        long_candidates = momentum[momentum > 0].dropna().sort_values('momentum')
        short_candidates = momentum[momentum < 0].dropna().sort_values('momentum')
    
        n_long = len(long_candidates)
        n_short = len(short_candidates)
        n = int(min(n_long, n_short) * p)
    
        if n == 0:
            print("{}, no signals".format(data.current_dt))
            context.long_securities = []
            context.short_securities = []
    
        context.long_securities = long_candidates.index[-n:]
        context.short_securities = short_candidates.index[:n]
    
    
    def rebalance(context, data):
        # weighing function
        n = len(context.long_securities)
        if n < 1:
            return
    
        weight = 0.5 / n
    
        # square off old positions if any
        for security in context.portfolio.positions:
            if security not in context.long_securities and \
                    security not in context.short_securities:
                order_target_percent(security, 0)
    
        # Place orders for the new portfolio
        for security in context.long_securities:
            order_target_percent(security, weight)
        for security in context.short_securities:
            order_target_percent(security, -weight)