Search code examples

Oozie date time start

I have a custom source of my own running on my flume.config that is responsible for extract data from a Facebook page every hour.

I'm wondering if there is any way of set the period of extraction with the time start of my coordinator?

Like, I set my coordinator to start at 01/01/2015 12 AM then my flume starts the extraction at the same time.

This is my custom source:

public class FacebookPageFansCitySource extends AbstractPollableSource
    private String accessToken;
    private String pageId;
    private int refreshInterval;

    private FacebookClient facebookClient;

    private volatile long lastPoll = 0;

    protected void doConfigure(Context context) throws FlumeException
        this.accessToken = context.getString(FacebookSourceConstants.ACCESS_TOKEN_KEY);
        this.pageId = context.getString(FacebookSourceConstants.PAGE_ID_KEY);
        this.refreshInterval = context.getInteger(FacebookSourceConstants.REFRESH_INTERVAL_KEY);

        facebookClient = new DefaultFacebookClient(accessToken, Version.VERSION_2_2);

    protected void doStart() throws FlumeException

    protected void doStop() throws FlumeException

    protected Status doProcess() throws EventDeliveryException
        Status status = Status.BACKOFF;

        if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - lastPoll) > refreshInterval)
            lastPoll = System.currentTimeMillis();

                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");


                Date date = new Date(lastPoll);

                String dateFormatted = simpleDateFormat.format(date);

                final Map<String, String> headers = new HashMap<String, String>();

                headers.put("timestamp", String.valueOf(date.getTime()));

                Insight insight = getInsight(pageId, "page_fans_city");

                if (insight != null)
                    final List<Event> events = new ArrayList<Event>();

                    ChannelProcessor channelProcessor = getChannelProcessor();

                    List<JsonObject> values = insight.getValues();

                    for (JsonObject value : values)
                        String referenceDate = simpleDateFormat.format(DateUtils.toDateFromLongFormat(value.getString("end_time")));

                        JsonObject jsonObjectValue = value.getJsonObject("value");

                        for (Iterator<?> keys = jsonObjectValue.keys(); keys.hasNext(); )
                            String key = (String);
                            Long count = jsonObjectValue.getLong(key);

                            JsonObject jsonObject = new JsonObject();

                            jsonObject.put("reference_date", referenceDate);
                            jsonObject.put("city", key);
                            jsonObject.put("count", count);
                            jsonObject.put("poll_time", dateFormatted);

                            Event event = EventBuilder.withBody(jsonObject.toString().getBytes(), headers);



                status = Status.READY;
            catch (Exception e)
                Logger.getLogger(FacebookPageFansCitySource.class.getName()).log(Level.SEVERE, null, e);

        return status;

    private Insight getInsight(String objectId, String metric)
        TimeZone timeZone = TimeZone.getTimeZone("UTC");

        Calendar calendar = Calendar.getInstance(timeZone);

        calendar.add(Calendar.DAY_OF_MONTH, -4);

        Parameter parameterSince = Parameter.with("since", calendar.getTime());

        calendar.add(Calendar.DAY_OF_MONTH, 1);

        Parameter parameterUntil = Parameter.with("until", calendar.getTime());

        Connection<Insight> responseListInsight = facebookClient.fetchConnection(objectId + "/insights/" + metric, Insight.class, parameterSince, parameterUntil);

        if (responseListInsight != null && !responseListInsight.getData().isEmpty())
            return responseListInsight.getData().get(0);
            return null;

Thanks for the help.


  • What about to create a java action, and setup a Workflow property what uses the coordinator's current time.


    Than use this property in your action as parameter.