Sesat > Docs + Support > Moving from FAST to Solr review > documentprocessor

Abstract

The documentprocessor is a pipeline framework written by Morten Tvenning. The reason for doing this is that when moving from FAST to solr there was a lot of value in moving the existing FAST pipeline steps over to the new Solr platform.
Most parts of the documentprocessor is the same as the FAST pipeline frameworks, but I needed to do a few workarounds and so there is a converter script.

There is a lot less paths and configuration in this implementation of the pipeline so it should be easier to maintain overall under Solr than under FAST.

Using multiple processors is not yet fully supported (where is no document processing distributor). But you can alternate in the code between two if you want to.

Table of contents




Download code

Checkout at the code from our Git repository located here.

svn co http://sesat.no/svn/sesat-documentprocessor/trunk/ documentprocessor

This code is open sourced under the LGPv3 license.

Overview

  • what you can't/shouldn't use from the FAST pipeline
  • rewriting the pipeline-config.xml
  • converting old pipeline steps
  • writing new pipeline steps
  • setting up and testing

What you can't/shouldn't use from the pipeline config

  • tokenization: the tokenization in solr is done by the type definition in schema.xml
  • DocInit, RowSetXSLT, FASTXMLReader has no use since you define what you give in by the function you call in the xmlrpc interface
  • FixmlGenerator: no need, not making fixml anymore, feeding html straight into Solr
  • RTSOutput: no need, same as above
  • Vectorizer: need to write our own if you actually need the vectors (there are vectors in solr after you feed the documents based on TF/IDF)
  • DateTimeNormalizer: no need, all dates need to be added as the datetime.date format
  • DateTimeSelector: no need, <insert reason here>

What you need to rewrite in the pipeline steps

Keeping unicode throughout the pipeline is a priority and you will get errors if there is str() casts in the pipeline steps. String casts are ok as long as they are not written back to the document as str. Just use unicode for less headache.

Rewriting the pipeline-config.xml

It is just as easy to just rewrite the pipeline-config from scratch. But given that you dont want to do that:

  • remove all the pipeline config for the steps above in the "Can't use" section
  • remove all proprietary pipeline config (pipeline steps owned by FAST)
  • remove all steps no needed any longer when moving to Solr
    • field copying is easier done with the copyField option in schema.xml
  • remove the references to those not in use any more in the pipelines/pipeline section of pipeline-config.xml

Converting old pipeline steps

There is a utility to rewrite the logging and imports of FAST pipeline steps to work with documentprocessor

cd documentprocessor/processorcode
python FASTtoSSP-utils/fastpipeline-code-fixer/pipelinecodefixer.py <your-pipeline-step>

This will output what is rewritten in the pipeline step, overwrite <your-pipeline-step> and add <your-pipeline-step>._FAST_BACKUP which was the original pipeline step.

Writing new pipeline steps

In documentprocessor/code/processors there is a ProcessorExample.py that should more or less show everything that is allowed to do.

All interaction with the document is the same as in FAST pipeline
Fetching configfiles is a bit different, but the cString option still works nicely (you get a normal unicode representation of the file back when asking for fileAsString)

<insert more info here>

Setting up and testing

documentprocessor runs as a daemon and the executable is ProcessorDaemon.py

Setup

python ProcessorDaemon.py 
usage ProcessorDaemon: daemon process for a given set of pipelines
needs a solrserver adress, solr schema.xml and pipelien configuration file
processes documents with one or more pipelines and sets up a queue of documents to be added to solr

options: (required)
-S   : solr http address
-C   : solr schema path
-P   : pipeline configuration file
-B   : the processors (pipeline step) directory path
options: (optional)
-s   : a python settings file that contains the configuration eg. django type settings.py, normal commandline args
     : will overwrite the settings in the file if also set
-c   : configfile paths files read by the pipelinesteps (default is config/), separated by ;
-x   : add xmlrpc support, requires -H and -p
-H   : hostname of documentprocessor (xmlrpc)
-p   : port to the documentprocessor server (xmlrpc)
-d   : turn off daemonize, will also ignore the logfile thas is given and log to std out
-b   : batch size for commits to solr
-o   : ontheflyadds, adding all documents to solr at once not waiting for the batchsize and flush documents

-l   : loglevel 1-5 where 5 is debug and 1 is fatal only
-L   : logfile to log to

For testing purposes -d is efficient, it forces the pipeline to run in commandline mode. Logging everything to stdout and not daemonizing.

Example run as daemon:

python ProcessorDaemon.py -S http://127.0.0.1:8080/solr -C ../../solr-testing/yellow-solr/conf/schema.xml -P yellow-pipelineconf/yellow-documentprocessor-config.xml -c yellow-pipelineconf/ConfigFiles/ -H 127.0.0.1 -p 8500 -l 5 -L test1.log -b 10000 -x

Example run with settings file: (also discussed below)

python ProcessorDaemon.py -s settingsfiles/yellowsettings.py
  • -s specifies a settings file to use (can define all the options below)
  • -S solr instance at localhost port 8080
  • -C referance to the schema.xml for the solr instance (needed to check values after document processing, so fields not defined can be logged and type errors cause errors)
  • -P the pipeline configuration for this document processor
  • -c the folder (or folders) containing configuration files the pipeline steps might request
  • -H hostname that the xmlrpc interface should answer to
  • -p port the xmlrpc interface should answer to
  • -l debug logging
  • -L logfile to use
  • -b the amount of documents to queue up before adding the documents to solr
  • -x add xmlrpc interface to the daemon

Settings file configuration

The settingsfile is optional but efficient and possible to store / move.

$ cat settingsfiles/mysettings.py        

xmlrpchost      = "solr1.someserver.com"
xmlrpcport      = 40001
usexmlrpc       = True
pipelineconfig  = "<path>/solr-documentprocessor/white-pipelineconf/white-documentprocessor-config.xml"
solrschema      = "<solrpath>/solr-white/solr/conf/schema.xml"
configdirs      = ["<path>/solr-documentprocessor/white-pipelineconf/ConfigFiles/"]
processorpath   = ["<path>/solr-documentprocessor/documentprocessor/processors"]
solrhost        = "http://solr1.someserver.com:8000/solr"
daemonize       = True
batchsize       = 10000
ontheflyadds    = False
loglevel        = 4
logfile         = "<path>/solr-documentprocessor/logs/procserver_white.log"

The settings file is read in first (before the other commandline parameters) which means that you can overwrite anything in the settings file one the commandline at startup.
So running "python ProcessorDaemon.py -s settingsfiles/mysettings.py -l 5 -d" will log debug and not daemonize or use the logfile set in mysettings.py

Shutting down

Since the pipeline keeps a queue of documents that it might not flush and commit to solr there is a command line script to take down the server cleanly.

usage:

python shutdownpipeline.py 
usage shutdownpipeline.py
required options:
-s : settings file for the pipeline

It takes the same settings file that was used to start the processor for taking it down. It only sends the shutdown signal over the xmlrpc interface and waits for the documentprocessor to shut itself down.

XMLRPC interface

These are the fucntions registered to the xmlrpc interface:

function what it does paramters comments
hasPipeline checks if the pipeline exists str pipeline name  
addDocumentByDict dicts are easily serializable
default add function
dict documentasdict
str pipeline name
 
isAlive answers true if the xmlrpc interface works    
addDocument if you import Document.Document from the documentprocssor code
a instance of it can be sent to the xmlrpc interface through this function
Document.Document document
str pipeline name optional
 
deleteDocumentById delete a document from the solr index str id  
addDocumentByTupleList add by tuple list, list of
[(field, value)]
list list-of-tuples
str pipeline name
 
flush tells the pipeline to add all queued documents to solr (and commit them if manualcommit is not set)    
commit tells the pipeline to tell solr to commit the changes (start indexing)   this needs to be done after all documents
are fed if the manualcommit i sted (default for the xmlrpc interface)
shutdown sends the flush and commit signs, waits for that to clean up, and stops itself.   used to keep everything intact when shutting down

Messages returned by addDocument functions and deleteDocument

  • "DELETE_OK"
  • "DELETE_FAILED <some message here>"
  • "ADDED_OK"
  • "ADDED_FAILED <some message here>"

Document Processors

The document processors are plug in scripts run on the document.

The default document processor looks like this :

"""
logging statements should be written with the self.getName() as the first argument always!
self._logger.error("%s: some message" % self.getName())

"""
from DocumentProcessor import ProcessorStatus
import Processor

class AttributeAdd(Processor.Processor):
    def __init__(self, name=""):
        Processor.Processor.__init__(self, name) 
        self.name = name 
    def ConfigurationChanged(self, arguments):
        self._logger = self.getContainer().getLogger()
	self._logger.info("%s: ConfigurationChanged OK" % self.getName())
        self._param1 = self.getParameter("some-value-in-processor-conf")
    def Process(self, uri, document):
        self._logger.info("%s: getting this doc id in: %s" % uri)
        if document.HasValue(self._param1):
           document.Set("test1", "has param :%s" % self._param1)
           return ProcessorStatus.OK
        return ProcessorStatus.OK_NoChange

The document is of type Document.Document and supports a number of functions (most of which existed in the FAST pipeline)
The inferface for Document.Document is better documented by the pydoc following the code.
Each Processor inherits from Processor which is "inside" a container, and the contanier has logger information and file information

  • getFileAsString(filename) to get files that is inside the configDir (in settings.py file or given by commandline. Can be more than one configDir)
  • getLogger() returns the log4j logger for the documentprocessor
  • ProcessDocument(pipelinename, document, addtoSolr=True) used for running pipelines and gives the possibility to run subpipelines from the Processors
  • PushDocumentToSolr(document) adds a document to the queue (if more than one documents should be put into the index this can be used)

Normally only the getFileAsString and getLogger should be used. The others are only availiable and might contain bugs when called from the Processor.

the pipeline config

<processors>
<processor name="WhitePhrasedNameMaker" >
    <load module="processors.PhrasedNameMaker" class="PhrasedNameMaker" />
     <description>
find the different permutations of phrased name searches
      </description>
    <config>
      <param name="fornavnfield" value= "wpfornavn" type="str" />
      <param name="mellomnavnfield" value = "wpmellomnavn" type="str" />
      <param name="etternavnfield" value="wpetternavn" type="str" />
      <param name="allnamesfields" value="wpphrasednameslist" type="str" />
      
    </config>
  </processor>
<processor name="StringSplitter" type="general" hidden="0">
      <load module="processors.StringSplitter" class="StringSplitter"/>
      <config>
        <param name="fields" value="mobiltelefon telefon" type="str"/>
        <param name="separator" value=" " type="str"/>
      </config>
      <description><![CDATA[This processor takes in a list of fields, and splits the fields by the sep and sets the field to that list
]]></description>
      <inputs>
      </inputs>
    </processor>

</processors>

<pipelines>
    <pipeline name="white" default="0">
     <description><![CDATA[white pipeline configuration ]]></description>
	<priority>0</priority>
	<processor name="WhitePhrasedNameMaker" />    
	<processor name="StringSplitter" />    
    </pipeline>
</pipelines>
</config>

Super small pipeline configuration.
The processors:

  • you can add a multiple processor inside the <processors> tags. The sequence they are in makes no difference
  • processor name="WhitePhrasedNameMaker" is the name of that instance that is used below in the pipeline config
  • load module="processors.PhrasedNameMaker" class="PhrasedNameMaker" means in folder processors file PhrasedNameMaker.py and Class PhrasedNameMaker
  • you can have multiple processor refering the same class
  • params has a name value and a type. this is rewritten to a hashmap where the value is casted to the python type (so need to specify python types, str,int,float etc..)

The pipeline:

  • name="white" is the name of the pipeline, when you process document with the documentprocessor, you need to specify which pipeline to use
  • the processor name= is the sequence of processors to use, so first run WhitePhrasedNameMaker and then StringSplitter

Using the xmlrpc interface

In the client, python example

import xmlrpclib

server = "http://somehost"
port = 8500
xmlrpc = xmlrpclib.Server("%s:%s" % (server, port))
if xmlrpc.isAlive():
   print "isAlive() returned True for xmlprc connection, connection good"

docdict = {'id':1, 'testfield':4032, 'testfield2':"mystring}

message = xmlrpc.addDocumentByDict(docdict, "some-pipeline")
print "%s returned this message: %s" % (docdict['id'], message)


Testing

Given a svn checkout of the documentprocessor folder or a tar packout the paths are relative to that as base.

Core files

The core files are located under documentprocessor/processorcode and their tests are located under tests/
The test coverage there should be about 90% and a few of the Classes are hard to test since they need a working solr instance to talk to. That is documented in The test files

Processors

The processors are located under documentprocessor/processors and documentprocessor/default_processors
The tests for the processors are located under processor_tests

When making a new processor testing it should be really simple. The processor tests follows a common template.
So assume that a new processor AttributeUpperCaser.py with processor class AttributeUpperCaser has been made and placed in the documentprocesor/processors/ folder. To test that, add a new file in the processor_tests/ and copy the content of one of the other processors into that.

cd processor-tests/
touch testAttributeUpperCaser.py
cat testAttributeCopy.py >> testAttributeUpperCase.py

Or make the file and copy the content in your IDE of choice

Then open the testAttributeUpperCaser.py file and to a find replace for AttributeCopy --> AttrbitueUpperCaser
The test should then run an fail for all but two tests.

Then configure the Parameters that the AttributeUpperCaser.py gets as input. this is done with the pdnormal class variable. So it the AttributeUpperCaser has two self.GetParameter() calls for "input" and "output", put these two into the pdnormal dictionary and remove the others. Then almost all the tests should run, except for testProcessorProcessing and testProcessorProcessingValidateOutput.

pdnormal = {
            "input": "myinputfield",
            "output":"myoutputfield",
        }

__getTestDocument returns a document to be parsed that should be processed, and __getTestDocumentNoOutput is used by the last test to check that the processor returns the right OK_NoChange flag.
Fill up the two documents with content.

    def __getTestDocument(self):
        return  Document(self.id, {
            'id': 1,
            'myinputfield':"inputvalue",
                })

    def __getTestDocumentDoNothing(self):
        return Document(self.id, {
            'id': 1,
                })

Then go into the testProcessorProcessingValidateOutput and add a self.failUnless for doc.HasValue("output")

Go to the commandline and do python testAttributeUpperCaser.py

This will test:

  • the import works
  • setting up the processor and adding its container and parameters
  • that the processor actually processes something when asked to process the test document
  • that the processor processes the empty documents and returns the value for nothing processed
  • that the processor processes the documents and does the right thing

A few of the default_processor tests are bigger and a good starting point for implementing more complex tests.

 © 2007-2009 Schibsted ASA
Contact us