OFAC compliance with ElasticSearch

19 01 2015

The mission: To create a search engine capable of detecting possible matches from our contact entry from with the OFAC Specially Designated Nationals List (SDN).

The background: I was hired to develop a highly compliant software that tracks and facilitates the movement of funds between the U.S. and Israel. Recently, the U.S. government has taken a hard-line against corporations moving US dollars offshore on behalf of US citizens. Compliance demands that customers wishing to move funds must be checked against the OFAC SDN list, as well as the new consolidated non-SDN list which contains smaller lists such as the Palestinian Legislative Council (NS-PLC) list.

OFAC does not provide an API for querying this list, but has launched a search page alongside their publication (and regular updates) of plain text files. The search page description provides adequate details regarding their matching algorithms and basic expectations, those being character and word matching, and phonetics. Phonetics allows for basic spelling mistakes or discrepancies, particularly when dealing with non-English names.

Given our development phase and low starting volume, I developed the front-end and accessed a free API available online. Our testing, however, showed that the fuzzy logic implemented was not sufficient for our purposes (and perhaps other issues such as alias matching, list updates, closed source). A cup of coffee and some Google/Stackoverflow searches later, and I was sold on ElasticSearch.

Getting started: I put my frugality aside and decided to test development on a new AWS instance. I was happy to find that Bitnami had just the machine image I needed to get up and running in a snap.

For some reason, Pageant wasn’t working for me, so I loaded my private key directly into PuTTY and was able to log in with the “bitnami” user.
bitnami ppk

Going a bit out of development order, let’s first create our script to download the text files:

vi /opt/bitnami/elasticsearch/scripts/fetch_ofac.sh

#! /bin/sh
cd /opt/bitnami/elasticsearch/scripts/
wget http://www.treasury.gov/ofac/downloads/sdn.csv -O sdn_prim.csv
wget http://www.treasury.gov/ofac/downloads/add.csv -O sdn_add.csv
wget http://www.treasury.gov/ofac/downloads/alt.csv -O sdn_alt.csv
wget http://www.treasury.gov/ofac/downloads/sdn_comments.csv -O sdn_comments.csv
wget http://www.treasury.gov/ofac/downloads/consolidated/cons_prim.csv -O cons_prim.csv
wget http://www.treasury.gov/ofac/downloads/consolidated/cons_add.csv -O cons_add.csv
wget http://www.treasury.gov/ofac/downloads/consolidated/cons_alt.csv -O cons_alt.csv
wget http://www.treasury.gov/ofac/downloads/consolidated/cons_comments.csv -O cons_comments.csv
echo "Done fetching OFAC files"

We are downloading the files right into the ES scripts folder, and renaming some so we get a consistent “list_type.ext” file name.

Configuring ElasticSearch

The first step was installing the plugins I needed. Namely, the CSV-river plugin and the phonetic plugin. The first will allow us to run our download script and index our CSV files, and repeat as often as we need. The latter lets us to conduct searches using our choice of the most popular “sounds-like” algorithms, not just Soundex as used by the OFAC search tool.

cd /opt/bitnami/elasticsearch/
sudo bin/plugin -install elasticsearch/elasticsearch-analysis-phonetic/2.4.1
sudo bin/plugin -install river-csv -url https://github.com/AgileWorksOrg/elasticsearch-river-csv/releases/download/2.1.1/elasticsearch-river-csv-2.1.1.zip

*As of the time of this writing, elasticsearch-river-csv-2.1.1 did not support parent-child relationships, but luckily I found an old fork of the code which did, and did not break my code, so I later ran:

sudo /opt/bitnami/elasticsearch/bin/plugin -uninstall river-csv
sudo /opt/bitnami/elasticsearch/bin/plugin -install river-csv -url https://github.com/Trax-air/elasticsearch-river-csv/releases/download/2.1.1/elasticsearch-river-csv-2.1.1-SNAPSHOT.zip

To modify the ES settings, and make the phonetics search the default, I ran
sudo vi /opt/bitnami/elasticsearch/config/elasticsearch.yml

Bitnami had the “network.publish_host” setting configured to some IP, so I just commented it out to automatically derive that setting. Then I added to the end of the file:

index.analysis.analyzer.default.type: custom
index.analysis.analyzer.default.tokenizer: standard
index.analysis.analyzer.default.filter: [standard, lowercase, my_metaphone]
index.analysis.filter.my_metaphone.type: phonetic
index.analysis.filter.my_metaphone.encoder : doublemetaphone
index.analysis.filter.my_metaphone.replace: false

which is probably the same as this YAML, but I wasn’t sure if the YAML was getting picked up, and figured I’d best stick to the conventions in the file.

index :
  analysis :
    analyzer :
      default :
        type : custom
        tokenizer : standard
        filter : [standard, lowercase, my_metaphone]
    filter :
      my_metaphone :
        type : phonetic
        encoder : doublemetaphone
        replace : false

Restart the server using
sudo /opt/bitnami/ctlscript.sh restart

or just ES with
sudo /opt/bitnami/elasticsearch/scripts/ctl.sh stop
sudo /opt/bitnami/elasticsearch/scripts/ctl.sh start

Since the SDN and non-SDN lists have the same format and do not repeat unique IDs, I was able to load each list into the same type.
My mappings are:

curl -XPOST 'http://localhost:9200/ofac' -d '{
    "settings" : {
        "number_of_shards" : 1,
        "number_of_replicas" : 0
    },
    "mappings" : {
      "prim" : {
        "properties" : {
          "ent_num":{"type":"integer"},
          "SDN_Name":{
			  "type":"string",
			  "fields": {
				  "raw": {"type": "string", "index": "not_analyzed" }
			  }
		  },
		  "SDN_Type":{"type" : "string", "index" : "not_analyzed"}
        }
      },
      "alt" : {
        "_parent" : {
          "type" : "prim"
        },
        "properties" : {
          "ent_num":{"type" : "integer", "index" : "not_analyzed"},
		  "alt_num":{"type" : "integer"},
		  "alt_type": {"type": "string"},
		  "alt_name":{
			  "type":"string",
			  "fields": {
				  "raw": {"type": "string", "index": "not_analyzed" }
			  }
		  },
		  "alt_remarks":{"type": "string"}
        }
      },
      "add" : {
        "_parent" : {
          "type" : "prim"
        },
        "properties" : {
          "ent_num":{"type" : "integer", "index" : "not_analyzed"},
		  "add_num":{"type" : "integer"},
		  "address":{"type" : "string"},
		  "citystatezip":{"type": "string"},
		  "country":{"type": "string"},
		  "add_remarks": {"type": "string"}
        }
      },
      "comments" : {
        "_parent" : {
          "type" : "prim"
        },
        "properties" : {
          "ent_num":{"type" : "integer", "index" : "not_analyzed"},
		  "comment":{"type" : "string"}
        }
      }
    }
}'    

A few points here on the mappings:

  1. I’m using a single index “ofac”, and the different files get their own types: prim (primary record), alt (aliases), add (address details), comments
  2. All types besides prim have prim set as their parent. This tell ES that we will be specifying a parent ID for all those documents indexed by the forked version of CSV-River
  3. The prim.SDN_Name field will have a sub-field called “raw” that has the non-tokenized non-analyzed name data. We will use this for our Aggregation grouping later.

I then set up my CSV-Rivers to download and grab the files. Since the River renames the files after it indexes them, further downloads will not cause collisions.

curl -XPUT localhost:9200/_river/prim/_meta -d '
{
    "type" : "csv",
    "csv_file" : {
        "folder" : "/opt/bitnami/elasticsearch/scripts",
        "filename_pattern" : ".*prim\\.csv$",
        "poll":"6h",
        "fields" : [ "ent_num","SDN_Name","SDN_Type","Program","Title","Call_Sign","Vess_type","Tonnage","GRT","Vess_flag","Vess_owner","Remarks" ],
		"first_line_is_header" : "false",
		"field_id" : "ent_num",
		"script_before_all": "/opt/bitnami/elasticsearch/scripts/fetch_ofac.sh"
	}, 
	"index" : { 
		"index" : "ofac", 
		"type": "prim" 
	}
}'
curl -XPUT localhost:9200/_river/alt/_meta -d '
{
    "type" : "csv",
    "csv_file" : {
        "folder" : "/opt/bitnami/elasticsearch/scripts",
        "filename_pattern" : ".*alt\\.csv$",
        "poll":"6h",
        "fields" : [ "ent_num","alt_num","alt_type","alt_name","alt_remarks"],
		"first_line_is_header" : "false",
		"field_id" : "alt_num",
		"field_parent": "ent_num"
	}, 
	"index" : { 
		"index" : "ofac", 
		"type": "alt" 
	}
}'
curl -XPUT localhost:9200/_river/add/_meta -d '
{
    "type" : "csv",
    "csv_file" : {
        "folder" : "/opt/bitnami/elasticsearch/scripts",
        "filename_pattern" : ".*add\\.csv$",
        "poll":"6h",
        "fields" : [ "ent_num","add_num","address","citystatezip","country","add_remarks"],
		"first_line_is_header" : "false",
		"field_id" : "add_num",
		"field_parent": "ent_num"
	}, 
	"index" : { 
		"index" : "ofac", 
		"type": "add" 
	}
}'
curl -XPUT localhost:9200/_river/comments/_meta -d '
{
    "type" : "csv",
    "csv_file" : {
        "folder" : "/opt/bitnami/elasticsearch/scripts",
        "filename_pattern" : ".*comments\\.csv$",
        "poll":"6h",
        "fields" : [ "ent_num","comment"],
		"first_line_is_header" : "false",
		"field_parent": "ent_num"
	}, 
	"index" : { 
		"index" : "ofac", 
		"type": "comments" 
	}
}'

For better or worse, I think the before_script_all is asynchronous, which is a strange choice for a “before” hook, which means the downloads usually complete after the data is already indexed. But if my data is actually updated every 6 or 12 hours doesn’t make a big difference to our needs.

 The Search

There is much of the Query DSL that I didn’t learn in my crash-course in ElasticSearch, but the docs and examples online finally got me to the correct result. The most challenging part was returning the alias data along with the matching results. The latest stable release I was using was version 1.4.  I believe 1.5 will have some JOIN functionality, but I had to choose between recreating the index with the aliases, addresses, and comments nested in the primary data, or using aggregates to pull out the information I needed. I chose aggregates and the result looked like this:

{
  "query": {
    "filtered": {
      "filter": {
        "bool": {
          "must_not": {
            "term": {
              "SDN_Type": "individual"
            }
          }
        }
      },
      "query": {
        "bool": {
          "should": [
            {
              "match": {
                "prim.SDN_Name": {
                  "query": "cubanator",
                  "operator": "and"
                }
              }
            },
            {
              "has_child": {
                "type": "alt",
                "query": {
                  "match": {
                    "alt.alt_name": {
                      "query": "cubanator",
                      "operator": "and"
                    }
                  }
                }
              }
            }
          ],
          "minimum_number_should_match": "65%"
        }
      }
    }
  },
  "aggs": {
    "fullmatches": {
      "terms": {
        "field": "prim.SDN_Name.raw",
        "size": "10"
      },
      "aggs": {
        "aliases": {
          "children": {
            "type": "alt"
          },
          "aggs": {
            "names": {
              "top_hits": {
                "size": "10"
              }
            }
          }
        }
      }
    }
  }
}

The filter will dynamically remove either individual (“must”) or non-individual (“must_not”) results, depending on our needs. Then we run our tokenized, phonetics matching against the names and aliases. Finally, we also return a top-hit aggregate of at most 10 matching alias records per name match.

Our PHP code using the official ElasticSearch client looks like:

$q = strtolower( trim($_REQUEST['last_name'].' '.$_REQUEST['first_name'].' '.$_REQUEST['name']));
if(empty($q)) exit('[]');

require 'vendor/autoload.php';

$params = array();
$params['hosts'] = array ( 'YOUR IP HERE');
$client = new Elasticsearch\Client($params);

$searchParams = array();
$searchParams['index'] = 'ofac';
$searchParams['type']  = 'prim';
$searchParams['body']['query']['filtered']['filter']['bool'][($_REQUEST['api']=='companySearch'?'must_not':'must')]['term']['prim.SDN_Type'] = 'individual';
$searchParams['body']['query']['filtered']['query']['bool']['should'][0]['match']['prim.SDN_Name']=array('operator'=>'and', 'query'=>$q);
$searchParams['body']['query']['filtered']['query']['bool']['should'][1]['has_child'] = 
	array('type'=>'alt', 'query'=>
		array('match'=>
			array('alt.alt_name'=>
				array('operator'=>'and', 'query'=>$q) 
			)
		)
	);
$searchParams['body']['query']['filtered']['query']['bool']['minimum_number_should_match'] = '65%';

$searchParams['body']['aggs']['fullmatches'] = 
	array('terms' =>
			array('field' => "prim.SDN_Name.raw",
				"size"=> "10"
			),
		  'aggs' =>
			array('aliases' => array(
				'children' => array('type'=>'alt'),
				'aggs' => array(
					'names' => array('top_hits'=> array("size"=> "10")),
					//'top_hit' => array('max' => array('script' => '_score'))
				)
			  )
			)
	);


$retDoc = $client->search($searchParams);

if($retDoc['hits']['total'] > 0) {
	
	if(count($retDoc['aggregations']['fullmatches']['buckets'])>0){
		$aliases=array();
		foreach($retDoc['aggregations']['fullmatches']['buckets'] as $i=>$b){
			if($b['aliases']['doc_count'] > 0) {
				$prim_aliases = array();
				foreach($b['aliases']['names']['hits']['hits'] as $j=> $alt) {
					$prim_aliases[] = $alt['_source']['alt_name'];
				}
				$aliases[$b['key']] = implode(';',$prim_aliases);
			}
		}

	}
	
	$json = array();
	foreach($retDoc['hits']['hits'] as $i=>$result) {
		$s = str_replace('-0- ','',$result['_source']);
		$json[] = array('ofac_sdn_ent_num'	=> $result['_id'],
						'ofac_sdn_name' 	=> $s['SDN_Name'],
						'ofac_sdn_type'		=> $s['SDN_Type'],
						'ofac_sdn_program'	=> $s['Program'],
						'ofac_sdn_title'	=> $s['Title'],
						'ofac_sdn_call_sign'=> $s["Call_Sign"],
						'ofac_sdn_vess_type'=> $s["Vess_type"],
						'ofac_sdn_tonnage'	=> $s["Tonnage"],
						'ofac_sdn_grt'		=> $s["GRT"],
						'ofac_sdn_vess_flag'=> $s["Vess_flag"],
						'ofac_sdn_vess_owner'=>$s["Vess_owner"],
						'ofac_sdn_remarks'	=> trim($s["Remarks"].(isset($aliases[$s['SDN_Name']])?'     AKAs: '.$aliases[$s['SDN_Name']]:'')),
						'type'				=> 'Ofac'.($result['_type']=='prim' ? "Sdn":"Alt")
				);
	}
	exit(json_encode($json));
}		
else
	exit('[]');

We set up our client, pass in the query, run through the aliases creating an easy local lookup, and return the same response structure as the Free API. And my front-end was never the wiser.

ofac

Advertisements

Actions

Information

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s




%d bloggers like this: