Split Filter Operator
Like the Split Filter operator, the Split Filter-Map operator extends the functionality of a traffic splitter with the filter-map transformation. The operator canselectively filter out what entries enter what sinks while also applying mapping functionality. The following example is an extension of the filter map example. We will implement a dataflow that can detect the whether or not an input is a valid addition or substraction statement, compute the equation, and send it to the right sink.
 
Prerequisites
This guide uses local Fluvio cluster. If you need to install it, please follow the instructions at here.
Transformation
The filter-map transformation is placed in the sink section.
sinks:
  - type: topic
    id: (...)
    transforms:
      - operator: filter-map
        run: |
          (... filter map function ...)
    (... more topics ...)
The function defined should take in the input and return a Result<Option<...>>. The Result<Option<...>> is either a Some(...) if the input is not filtered or a None if the input should be filtered. The following are our transformations. The code is shortened for brevity but the full example is below. The regex and calculation is just replaced for the dosubtraction topic compared to the filter map example.
sinks:
  - type: topic
    id: doaddition
    transforms: 
      - operator: filter-map 
        dependencies:
          - name: regex
            version: "1"
        run: |
          fn do_addition(input: String) -> Result<Option<String> > {
            (...)
          }
  - type: topic
    id: dosubtraction
    transforms: 
      - operator: filter-map 
        dependencies:
          - name: regex
            version: "1"
        run: |
          fn do_substraction(input: String) -> Result<Option<String> > {
            let re = regex::Regex::new(r"^(\d+)-(\d+)=$").unwrap();
            if let Some(num) = re.captures(&input) {
                (...)
                return Ok(Some(format!("{}{}",input,(a-b))));
            } else{
                return Ok(None);
            }
          }
Running the Example
Copy and paste following config and save it as dataflow.yaml.
# dataflow.yaml
apiVersion: 0.5.0
meta:
  name: split-filter-map-example
  version: 0.1.0
  namespace: examples
config:
  converter: raw
topics:
  sentences:
    schema:
      value:
        type: string
  doaddition:
    schema:
      value:
        type: string
  dosubtraction:
    schema:
      value:
        type: string
services:
  filter-map-service:
    sources:
      - type: topic
        id: sentences
    sinks:
      - type: topic
        id: doaddition
        transforms: 
          - operator: filter-map 
            dependencies:
              - name: regex
                version: "1"
            run: |
              fn do_addition(input: String) -> Result<Option<String> > {
                let re = regex::Regex::new(r"^(\d+)\+(\d+)=$").unwrap();
                if let Some(num) = re.captures(&input) {
                    let a: i32 = num.get(1).unwrap().as_str().parse().unwrap();
                    let b: i32 = num.get(2).unwrap().as_str().parse().unwrap();
                    return Ok(Some(format!("{}{}",input,(a+b))));
                } else{
                    return Ok(None);
                }
              }
      - type: topic
        id: dosubtraction
        transforms: 
          - operator: filter-map 
            dependencies:
              - name: regex
                version: "1"
            run: |
              fn do_substraction(input: String) -> Result<Option<String> > {
                let re = regex::Regex::new(r"^(\d+)-(\d+)=$").unwrap();
                if let Some(num) = re.captures(&input) {
                    let a: i32 = num.get(1).unwrap().as_str().parse().unwrap();
                    let b: i32 = num.get(2).unwrap().as_str().parse().unwrap();
                    return Ok(Some(format!("{}{}",input,(a-b))));
                } else{
                    return Ok(None);
                }
              }
To run example:
$ sdf run
Produce sentences to in sentence topic:
$ echo "Hello world" | fluvio produce sentences
$ echo "9999+1=" | fluvio produce sentences
$ echo "9999-1=" | fluvio produce sentences
Consume topic doaddition to retrieve the result in another terminal:
$ fluvio consume doaddition -Bd
9999+1=10000
Consume the other topic dosubtraction
$ fluvio consume dosubtraction -Bd
9999-1=9998
We can see the first entry Hello World is discarded, but the other two are sent to the right topic with the respective mapping calculation done.
Cleanup
Exit sdf terminal and clean-up. The --force flag removes the topics:
$ sdf clean --force
Conclusion
In this example, we covered how to use split traffic with the filter map operator.