Search code examples
pythonpyparsing

Using pyparsing for parsing filter expressions


I'm currently trying to write a parser (using pyparsing) that can parse strings that can then be applied to a (pandas) dataframe to filter data. I've already got it working after much trial & error for all kinds of example strings, however I am having trouble with extending it further from this point on.

First, here is my current code (that should be working if you just copy paste, at least for my Python 3.11.9 and pyparsing 3.1.2):

import pyparsing as pp

# Define the components of the grammar
field_name = pp.Word(pp.alphas + "_", pp.alphanums + "_")
action = pp.one_of("include exclude")
sub_action = pp.one_of("equals contains starts_with ends_with greater_than not_equals not_contains not_starts_with not_ends_with empty not_empty less_than less_than_or_equal_to greater_than_or_equal_to between regex in_list not_in_list")

# Custom regex pattern parser that handles regex ending at the first space
def regex_pattern():
    def parse_regex(t):
        # Join tokens to form the regex pattern
        return ''.join(t[0])
    return pp.Regex(r'[^ ]+')("regex").setParseAction(parse_regex)

# Define value as either a quoted string, a regex pattern, or a simple word with allowed characters
quoted_string = pp.QuotedString('"')
unquoted_value = pp.Word(pp.alphanums + "_-;, ") | pp.Regex(r'[^/]+')

value = pp.Optional(quoted_string | regex_pattern() | unquoted_value)("value")

slash = pp.Suppress("/")
filter_expr = pp.Group(field_name("field") + slash + action("action") + slash + sub_action("sub_action") + pp.Optional(slash + value, default=""))

# Define logical operators
and_op = pp.one_of("AND and")
or_op = pp.one_of("OR or")
not_op = pp.one_of("NOT not")

# Define the overall expression using infix notation
expression = pp.infixNotation(filter_expr,
                           [
                               (not_op, 1, pp.opAssoc.RIGHT),
                               (and_op, 2, pp.opAssoc.LEFT),
                               (or_op, 2, pp.opAssoc.LEFT)
                           ])

# List of test filters
test_filters = [
    "order_type/exclude/contains/STOP ORDER AND order_validity/exclude/contains/GOOD FOR DAY",
    "order_status/include/regex/^New$ AND order_id/include/equals/123;124;125",
    "order_id/include/equals/123;124;125",
    "order_id/include/equals/125 OR currency/include/equals/EUR",    
    "trade_amount/include/greater_than/1500 AND currency/include/equals/USD",    
    "trade_amount/include/between/1200-2000 AND currency/include/in_list/USD,EUR",
    "order_status/include/starts_with/New;Filled OR order_status/include/ends_with/ed",
    "order_status/exclude/empty AND filter_code/include/not_empty",    
    "order_status/include/regex/^New$",
    "order_status/include/regex/^New$ OR order_status/include/regex/^Changed$",
    "order_status/include/contains/New;Changed"
]

# Loop over test filters, parse each, and display the results
for test_string in test_filters:
    print(f"Testing filter: {test_string}")
    try:
        parse_result = expression.parse_string(test_string, parseAll=True).asList()[0]
        print(f"Parsed result: {parse_result}")
    except Exception as e:
        print(f"Error with filter: {test_string}")
        print(e)
    print("\n")

Now, if you run the code, you'll notice that all the test strings parse just fine, except the first element of the list, "order_type/exclude/contains/STOP ORDER AND order_validity/exclude/contains/GOOD FOR DAY".

The problem (as far as I can tell) is that the empty space between "STOP" and "ORDER" is being recognized as the end of the "value" part of that part of the group, and then it breaks.

What I've tried is to use Skipsto to just skip to the next logical operator after the sub_action part is done, but that didn't work. Also, I wasn't sure how extendable that is, because in theory it should even be possible to have many chained expressions (e.g. part1 AND part2 OR part3), where each part consits of the 3-4 elements (field_name, action, sub_action and the optional value).

I've also tried extending the unquoted_value to also include empty spaces, but that changed nothing, either.

I've also looked at some of the examples over at https://github.com/pyparsing/pyparsing/tree/master/examples, but I couldn't really see anything that was similar to my use case. (Maybe once my code is working properly, it could be added as an example there, not sure how useful my case is to others).


Solution

  • Rather than define a term that includes the spaces, better to define a term that parses words, so that it can detect and stop if it finds a word that shouldn't be included (like one of the logical operator words). I did this and then wrapped it in a Combine that a) allows for whitespace between the words (adjacent=False), and b) joins them back together with single spaces.

    I made these changes in your parser, and things look like they will work better for you:

    # I used CaselessKeywords so that you get a repeatable return value, 
    #  regardless of the input
    and_op = pp.CaselessKeyword("and")
    or_op = pp.CaselessKeyword("or")
    not_op = pp.CaselessKeyword("not")
    
    # define an expression for any logical operator, to be used when
    # defining words that unquoted_value should not include
    any_operator = and_op | or_op | not_op
    
    value_chars = pp.alphanums + "--;,"
    # an unquoted_value is one or more words that are not operators
    unquoted_value = pp.Combine(pp.OneOrMore(pp.Word(value_chars), stop_on=any_operator), join_string=" ", adjacent=False)
    # can also be written as
    # unquoted_value = pp.Combine(pp.Word(value_chars)[1, ...:any_operator], join_string=" ", adjacent=False)
    
    # regex_pattern() can be replaced by this regex
    single_word = pp.Regex(r"\S+")
    value = (quoted_string | unquoted_value | single_word)("value")
    

    Lastly, your testing loop looks a lot like the loops I used to write in many of these StackOverflow question responses. I wrote them so many times that I finally added a ParserElement method run_tests, which you can call like this to replace your test loop:

    expression.run_tests(test_filters)