Package weka.knowledgeflow
Class StepManagerImpl
java.lang.Object
weka.knowledgeflow.StepManagerImpl
- All Implemented Interfaces:
StepManager
Concrete implementation of the StepManager interface. Has a number of
methods, beyond those aimed at Step implementations, that are useful for
applications that manipulate Steps and their connections.
- Version:
- $Revision: $
- Author:
- Mark Hall (mhall{[at]}pentaho{[dot]}com)
-
Field Summary
Fields inherited from interface weka.knowledgeflow.StepManager
CON_AUX_DATA_BATCH_ASSOCIATION_RULES, CON_AUX_DATA_CHART_DATA_POINT, CON_AUX_DATA_CHART_LEGEND, CON_AUX_DATA_CHART_MAX, CON_AUX_DATA_CHART_MIN, CON_AUX_DATA_CLASS_ATTRIBUTE, CON_AUX_DATA_ENVIRONMENT_PROPERTIES, CON_AUX_DATA_ENVIRONMENT_RESULTS, CON_AUX_DATA_ENVIRONMENT_VARIABLES, CON_AUX_DATA_GRAPH_TITLE, CON_AUX_DATA_GRAPH_TYPE, CON_AUX_DATA_INCREMENTAL_STREAM_END, CON_AUX_DATA_INSTANCE, CON_AUX_DATA_IS_INCREMENTAL, CON_AUX_DATA_LABEL, CON_AUX_DATA_MAX_SET_NUM, CON_AUX_DATA_PRIMARY_PAYLOAD_NOT_THREAD_SAFE, CON_AUX_DATA_SET_NUM, CON_AUX_DATA_TEST_INSTANCE, CON_AUX_DATA_TESTSET, CON_AUX_DATA_TEXT_TITLE, CON_AUX_DATA_TRAININGSET, CON_BATCH_ASSOCIATOR, CON_BATCH_CLASSIFIER, CON_BATCH_CLUSTERER, CON_CHART, CON_DATASET, CON_ENVIRONMENT, CON_GRAPH, CON_IMAGE, CON_INCREMENTAL_CLASSIFIER, CON_INCREMENTAL_CLUSTERER, CON_INFO, CON_INSTANCE, CON_JOB_FAILURE, CON_JOB_SUCCESS, CON_TESTSET, CON_TEXT, CON_THRESHOLD_DATA, CON_TRAININGSET, CON_VISUALIZABLE_ERROR
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionvoid
addIncomingConnection
(String connectionName, StepManagerImpl step) Add an incoming connection (comprising of the type of connection and associated step component) to this step of the specified typeboolean
addOutgoingConnection
(String connectionName, StepManagerImpl step) Add an outgoing connection (comprising of the type of connection and associated target step) to this step of the specified type.boolean
addOutgoingConnection
(String connectionName, StepManagerImpl step, boolean force) Add an outgoing connection (comprising of the type of connection and associated target step) to this step of the specified type.void
addStepOutputListener
(StepOutputListener listener, String outputConnectionName) Register non-step third party to receive data from the managed step for the specified outgoing connection type.void
Clear all connections to/from the step managed by this manager.void
Clear all registered StepOutputListenersvoid
clearStepOutputListeners
(String outputConnectionName) Clear all the StepOutputListeners that are registered to receive the supplied connection type.void
disconnectStep
(Step toDisconnect) Remove the supplied step from connections (both incoming and outgoing of all types) for the step managed by this manager.void
disconnectStepWithConnection
(Step toDisconnect, String connType) Disconnect the supplied step under the associated connection type from both the incoming and outgoing connections for the step managed by this manager.environmentSubstitute
(String source) Substitute the values of environment variables in the given stringfindStepInFlow
(String stepNameToFind) Finds a named step in the current flow.void
finished()
Finished all processing.Get the execution environment the managed step is running ingetIncomingConnectedStepsOfConnectionType
(String connectionName) Get a list of steps providing incoming connections of the specified typegetIncomingConnectedStepWithName
(String stepName) Get a named step connected to this step with an incoming connectionGet the man of upstream (incoming connections) connected stepsgetIncomingStructureForConnectionType
(String connectionName) Attempt to get the incoming structure (as a header-only set of instances) for the named incoming connection type.getIncomingStructureForConnectionType
(String connectionName, Environment env) Attempt to retrieve the structure (as a header-only set of instances) for the named incoming connection type.getIncomingStructureFromStep
(StepManager sourceStep, String connectionName) Attempt to get the incoming structure (as a header-only set of instances) from the given managed step for the given connection type.Returns a reference to the step being managed if it has one or more outgoing CON_INFO connections.getInfoStep
(Class stepClass) Returns a reference to the step being managed if it has one or more outgoing CON_INFO connections and the managed step is of the supplied classgetLog()
Get the log to useGet the logging level in useGet the step managed by this managergetName()
Get the name of the Step being managedgetOutgoingConnectedStepsOfConnectionType
(String connectionName) Get a list of downstream steps connected to this step with the given connection type.getOutgoingConnectedStepWithName
(String stepName) Get a named step connected to this step with an outgoing connectionGet the map of downstream (outgoing connections) connected stepsGet the current knowledge flow settingsboolean
Get whether the managed step must run single-threaded.Used by the rendering routine in LayoutPanel to ensure that connections downstream from a deleted connection get rendered in grey rather than red.getStepProperty
(String name) Get a named property for this step.Get the step visual in use (if running in a visual environment)void
Finished processing due to a stop being requested.boolean
Returns true if, at the current time, the managed step is busy with processingboolean
Return true if the current step is finished.boolean
Return true if a stop has been requested by the runtime environmentboolean
isStreamFinished
(Data data) Returns true if this data object marks the end of an incremental stream.void
log
(String message, LoggingLevel level) Log a message at the supplied logging levelvoid
Log a message at the basic logging levelvoid
Log a message at the debugging logging levelvoid
logDetailed
(String message) Log a message at the detailed logging levelvoid
Log an errorvoid
Log a message at the low logging levelvoid
logWarning
(String message) Log a warning messageint
Get the number of incoming connections to the managed stepint
numIncomingConnectionsOfType
(String connectionName) Get the number of incoming connections to the managed step of a given typeint
Get the number of outgoing connections from the managed stepint
numOutgoingConnectionsOfType
(String connectionName) Get the number of outgoing connections from the managed step of a given typevoid
outputData
(String outgoingConnectionName, String stepName, Data data) Outputs the supplied Data object to the named Step.void
outputData
(String outgoingConnectionName, Data data) Output a Data object to all downstream connected Steps that are connected with the supplied connection name.void
outputData
(Data... data) Output one or more Data objects to all relevant steps.void
Started processing.void
removeIncomingConnection
(String connectionName, StepManagerImpl step) Remove an incoming connection to this step of the specified typevoid
removeOutgoingConnection
(String connectionName, StepManagerImpl step) Remove an outgoing connection from this step of the specified typevoid
removeStepOutputListener
(StepOutputListener listener, String outputConnectionName) De-register non-step third party from receiving data from the managed stepvoid
Set the log to usevoid
setLoggingLevel
(LoggingLevel newLevel) Set the logging level to usevoid
setManagedStep
(Step step) Set the step managed by this managervoid
setStepIsResourceIntensive
(boolean resourceIntensive) Set whether the managed step is resource (cpu/memory) intensive or notvoid
setStepMustRunSingleThreaded
(boolean mustRunSingleThreaded) Set whether the managed step must run single-threaded.void
setStepProperty
(String name, Object value) Set a property for this stepvoid
setStepVisual
(StepVisual visual) Set the step visual to use when running in a graphical environmentvoid
setStopRequested
(boolean stopRequested) Set the status of the stop requested flagvoid
statusMessage
(String message) Output a status message to the status area of the logboolean
Get whether the managed step is resource (cpu/memory) intensive or notGets a prefix for the step managed by this manager.void
throughputFinished
(Data... data) Clients can use this to indicate that throughput measuring is finished (i.e.void
Clients can use this to record a stop point for streaming throughput measuringvoid
Clients can use this to record a start point for streaming throughput measuring
-
Constructor Details
-
StepManagerImpl
Constructor- Parameters:
step
- the Step to manage
-
-
Method Details
-
getName
Get the name of the Step being managed- Specified by:
getName
in interfaceStepManager
- Returns:
- the name of the Step being managed
-
getManagedStep
Get the step managed by this manager- Specified by:
getManagedStep
in interfaceStepManager
- Returns:
- the step managed by this manager
-
setManagedStep
Set the step managed by this manager- Parameters:
step
- the step to manage
-
setStepIsResourceIntensive
public void setStepIsResourceIntensive(boolean resourceIntensive) Set whether the managed step is resource (cpu/memory) intensive or not- Specified by:
setStepIsResourceIntensive
in interfaceStepManager
- Parameters:
resourceIntensive
- true if the managed step is resource intensive
-
stepIsResourceIntensive
public boolean stepIsResourceIntensive()Get whether the managed step is resource (cpu/memory) intensive or not- Specified by:
stepIsResourceIntensive
in interfaceStepManager
- Returns:
- true if the step is resource intensive
-
setStepMustRunSingleThreaded
public void setStepMustRunSingleThreaded(boolean mustRunSingleThreaded) Set whether the managed step must run single-threaded. I.e. in an executor service with one worker thread, thus effectively preventing more than one copy of the step from executing at any one point in time- Specified by:
setStepMustRunSingleThreaded
in interfaceStepManager
- Parameters:
mustRunSingleThreaded
- true if the managed step must run single-threaded
-
getStepMustRunSingleThreaded
public boolean getStepMustRunSingleThreaded()Get whether the managed step must run single-threaded. I.e. in an executor service with one worker thread, thus effectively preventing more than one copy of the step from executing at any one point in time- Specified by:
getStepMustRunSingleThreaded
in interfaceStepManager
- Returns:
- true if the managed step must run single-threaded
-
getStepVisual
Get the step visual in use (if running in a visual environment)- Returns:
- the step visual in use
-
setStepVisual
Set the step visual to use when running in a graphical environment- Parameters:
visual
- the step visual to use
-
setStepProperty
Set a property for this step- Parameters:
name
- the name of the propertyvalue
- the value of the property
-
getStepProperty
Get a named property for this step.- Parameters:
name
- the name of the property to get- Returns:
- the value of the property or null if the property is not set
-
getExecutionEnvironment
Get the execution environment the managed step is running in- Specified by:
getExecutionEnvironment
in interfaceStepManager
- Returns:
- the execution environment
-
getSettings
Get the current knowledge flow settings- Specified by:
getSettings
in interfaceStepManager
- Returns:
- the current knowledge flow settings
- Throws:
IllegalStateException
- if there is no execution environment available
-
getLoggingLevel
Get the logging level in use- Specified by:
getLoggingLevel
in interfaceStepManager
- Returns:
- the logging level in use
-
setLoggingLevel
Set the logging level to use- Parameters:
newLevel
- the level to use
-
getLog
Get the log to use- Specified by:
getLog
in interfaceStepManager
- Returns:
- the log in use or null if no log has been set
-
setLog
Set the log to use- Parameters:
log
- the log to use
-
isStepBusy
public boolean isStepBusy()Returns true if, at the current time, the managed step is busy with processing- Specified by:
isStepBusy
in interfaceStepManager
- Returns:
- true if the managed step is busy with processing
-
isStopRequested
public boolean isStopRequested()Return true if a stop has been requested by the runtime environment- Specified by:
isStopRequested
in interfaceStepManager
- Returns:
- true if a stop has been requested
-
isStepFinished
public boolean isStepFinished()Return true if the current step is finished.- Specified by:
isStepFinished
in interfaceStepManager
- Returns:
- true if the current step is finished
-
setStopRequested
public void setStopRequested(boolean stopRequested) Set the status of the stop requested flag- Parameters:
stopRequested
- true if a stop has been requested
-
processing
public void processing()Started processing. Sets the busy flag to true.- Specified by:
processing
in interfaceStepManager
-
finished
public void finished()Finished all processing. Sets the busy flag to false and prints a finished message to the status area of the log.- Specified by:
finished
in interfaceStepManager
-
interrupted
public void interrupted()Finished processing due to a stop being requested. Sets the busy flag to false.- Specified by:
interrupted
in interfaceStepManager
-
isStreamFinished
Returns true if this data object marks the end of an incremental stream. Note - does not check that the data object is actually an incremental one of some sort! Just checks to see if the CON_AUX_DATA_INCREMENTAL_STREAM_END flag is set to true or not;- Specified by:
isStreamFinished
in interfaceStepManager
- Parameters:
data
- the data element to check- Returns:
- true if the data element is flagged as end of stream
-
throughputUpdateStart
public void throughputUpdateStart()Clients can use this to record a start point for streaming throughput measuring- Specified by:
throughputUpdateStart
in interfaceStepManager
-
throughputUpdateEnd
public void throughputUpdateEnd()Clients can use this to record a stop point for streaming throughput measuring- Specified by:
throughputUpdateEnd
in interfaceStepManager
-
throughputFinished
Clients can use this to indicate that throughput measuring is finished (i.e. the stream being processed has ended). Final throughput information is printed to the log and status- Specified by:
throughputFinished
in interfaceStepManager
- Parameters:
data
- one or more Data events (with appropriate connection type set) to pass on to downstream connected steps. These are used to carry any final data and to inform the downstream step(s) that the stream has ended- Throws:
WekaException
- if a problem occurs
-
disconnectStepWithConnection
Disconnect the supplied step under the associated connection type from both the incoming and outgoing connections for the step managed by this manager. Does nothing if this step does not have any connections to the supplied step, or does not have connections to the supplied step of the required type.- Parameters:
toDisconnect
- the step to disconnectconnType
- the connection type to disconnect
-
disconnectStep
Remove the supplied step from connections (both incoming and outgoing of all types) for the step managed by this manager. Does nothing if the this step does not have any connections to the supplied step- Parameters:
toDisconnect
- the step to disconnect
-
clearAllConnections
public void clearAllConnections()Clear all connections to/from the step managed by this manager. Also makes sure that all directly connected upstream and downstream steps remove their respective outgoing and incoming connections to this step -
addIncomingConnection
Add an incoming connection (comprising of the type of connection and associated step component) to this step of the specified type- Parameters:
connectionName
- the name of the type of connection to addstep
- the source step component that is connecting with given connection type
-
removeIncomingConnection
Remove an incoming connection to this step of the specified type- Parameters:
connectionName
- the name of the type of connection to removestep
- the source step component associated with the given connection type
-
addOutgoingConnection
Add an outgoing connection (comprising of the type of connection and associated target step) to this step of the specified type. Connection is only made if the target step will accept the connection type at this time- Parameters:
connectionName
- the name of the type of connection to addstep
- the target step component that is receiving the given connection type it can't accept the connection at the present time- Returns:
- true if the connection was successful
-
addOutgoingConnection
Add an outgoing connection (comprising of the type of connection and associated target step) to this step of the specified type. Connection is only made if the target step will accept the connection type at this time- Parameters:
connectionName
- the name of the type of connection to addstep
- the target step component that is receiving the given connection typeforce
- whether to force the connection, even if the target step says it can't accept the connection at the present time- Returns:
- true if the connection was successful
-
removeOutgoingConnection
Remove an outgoing connection from this step of the specified type- Parameters:
connectionName
- the name of the type of connection to removestep
- the target step component associated with the given connection type
-
getIncomingConnectedStepsOfConnectionType
Get a list of steps providing incoming connections of the specified type- Specified by:
getIncomingConnectedStepsOfConnectionType
in interfaceStepManager
- Parameters:
connectionName
- the type of connection being received by this step- Returns:
- a list of connected steps
-
getOutgoingConnectedStepsOfConnectionType
Description copied from interface:StepManager
Get a list of downstream steps connected to this step with the given connection type.- Specified by:
getOutgoingConnectedStepsOfConnectionType
in interfaceStepManager
- Parameters:
connectionName
- the name of the outgoing connection- Returns:
- a list of downstream steps connected to this one with the named connection type
-
getIncomingConnectedStepWithName
Get a named step connected to this step with an incoming connection- Specified by:
getIncomingConnectedStepWithName
in interfaceStepManager
- Parameters:
stepName
- the name of the step to look for- Returns:
- the connected step
-
getOutgoingConnectedStepWithName
Get a named step connected to this step with an outgoing connection- Specified by:
getOutgoingConnectedStepWithName
in interfaceStepManager
- Parameters:
stepName
- the name of the step to look for- Returns:
- the connected step
-
getOutgoingConnections
Get the map of downstream (outgoing connections) connected steps- Specified by:
getOutgoingConnections
in interfaceStepManager
- Returns:
- the map of downstream connected steps
-
getIncomingConnections
Get the man of upstream (incoming connections) connected steps- Specified by:
getIncomingConnections
in interfaceStepManager
- Returns:
- the map of upstream connected steps
-
addStepOutputListener
Register non-step third party to receive data from the managed step for the specified outgoing connection type. Output listeners are not serialized into the JSON flow when flows are saved.- Parameters:
listener
- the output listener to registeroutputConnectionName
- the name of the connection type
-
removeStepOutputListener
De-register non-step third party from receiving data from the managed step- Parameters:
listener
- the output listener to de-registeroutputConnectionName
- the name of the connection type the listener is registered against
-
clearAllStepOutputListeners
public void clearAllStepOutputListeners()Clear all registered StepOutputListeners -
clearStepOutputListeners
Clear all the StepOutputListeners that are registered to receive the supplied connection type.- Parameters:
outputConnectionName
- type of the connection to clear the listeners for
-
outputData
Output a Data object to all downstream connected Steps that are connected with the supplied connection name. Sets the connection type on the supplied Data object to the supplied connection name. Also notifies any registered StepOutputListeners.- Specified by:
outputData
in interfaceStepManager
- Parameters:
outgoingConnectionName
- the type of the outgoing connection to send data todata
- a single Data object to send- Throws:
WekaException
-
outputData
Output one or more Data objects to all relevant steps. Populates the source in each Data object for the client, HOWEVER, the client must have populated the connection type in each Data object to be output so that the StepManager knows which connected steps to send the data to. Also notifies any registeredStepOutputListeners
. Note that the downstream step(s)' processIncoming() method is called in a separate thread for batch connections. Furthermore, if multiple Data objects are supplied via the varargs argument, and a target step will receive more than one of the Data objects, then they will be passed on to the step in question sequentially within the same thread of execution.- Specified by:
outputData
in interfaceStepManager
- Parameters:
data
- one or more Data objects to be sent- Throws:
WekaException
- if a problem occurs
-
outputData
public void outputData(String outgoingConnectionName, String stepName, Data data) throws WekaException Outputs the supplied Data object to the named Step. Does nothing if the named step is not connected immediately downstream of this Step. Sets the supplied connection name on the Data object. Also notifies any StepOutputListeners.- Specified by:
outputData
in interfaceStepManager
- Parameters:
outgoingConnectionName
- the name of the outgoing connectionstepName
- the name of the step to send the data todata
- the data to send- Throws:
WekaException
-
getStepOutgoingConnectionTypes
Used by the rendering routine in LayoutPanel to ensure that connections downstream from a deleted connection get rendered in grey rather than red.- Returns:
- a list of outgoing connection types that the managed step can produce (adjusted to take into account any upstream broken connections)
-
numIncomingConnections
public int numIncomingConnections()Get the number of incoming connections to the managed step- Specified by:
numIncomingConnections
in interfaceStepManager
- Returns:
- the number of incoming connections
-
numIncomingConnectionsOfType
Get the number of incoming connections to the managed step of a given type- Specified by:
numIncomingConnectionsOfType
in interfaceStepManager
- Parameters:
connectionName
- the name of the connection type- Returns:
- the number of incoming connections of this type
-
numOutgoingConnections
public int numOutgoingConnections()Get the number of outgoing connections from the managed step- Specified by:
numOutgoingConnections
in interfaceStepManager
- Returns:
- the number of incoming connections
-
numOutgoingConnectionsOfType
Get the number of outgoing connections from the managed step of a given type- Specified by:
numOutgoingConnectionsOfType
in interfaceStepManager
- Parameters:
connectionName
- the name of the connection type- Returns:
- the number of outgoing connections of this type
-
getIncomingStructureForConnectionType
Attempt to get the incoming structure (as a header-only set of instances) for the named incoming connection type. Assumes that there is only one incoming connection of the named type. If there are zero, or more than one, then null is returned- Specified by:
getIncomingStructureForConnectionType
in interfaceStepManager
- Parameters:
connectionName
- the name of the incoming connection to get the structure for- Returns:
- the structure as a header-only set of instances or null if there are zero or more than one upstream connected steps producing the named connection, or if the upstream step can't tell us the structure, or if the upstream step can't represent the structure of the connection type as a set of instances.
- Throws:
WekaException
- if a problem occurs
-
getIncomingStructureForConnectionType
public Instances getIncomingStructureForConnectionType(String connectionName, Environment env) throws WekaException Attempt to retrieve the structure (as a header-only set of instances) for the named incoming connection type. Assumes that there is only one step connected with the supplied incoming connection type.- Specified by:
getIncomingStructureForConnectionType
in interfaceStepManager
- Parameters:
connectionName
- the type of the incoming connection to get the structure forenv
- the Environment to use- Returns:
- the structure of the data for the specified incoming connection, or null if the structure can't be determined (or represented as an Instances object)
- Throws:
WekaException
- if a problem occurs
-
getIncomingStructureFromStep
public Instances getIncomingStructureFromStep(StepManager sourceStep, String connectionName) throws WekaException Attempt to get the incoming structure (as a header-only set of instances) from the given managed step for the given connection type.- Specified by:
getIncomingStructureFromStep
in interfaceStepManager
- Parameters:
sourceStep
- the step manager managing the source stepconnectionName
- the name of the connection to attempt to get the structure for- Returns:
- the structure as a header-only set of instances, or null if the source step can't determine this at present or if it can't be represented as a set of instances.
- Throws:
WekaException
- if a problem occurs
-
logLow
Log a message at the low logging level- Specified by:
logLow
in interfaceStepManager
- Parameters:
message
- the message to log
-
logBasic
Log a message at the basic logging level- Specified by:
logBasic
in interfaceStepManager
- Parameters:
message
- the message to log
-
logDetailed
Log a message at the detailed logging level- Specified by:
logDetailed
in interfaceStepManager
- Parameters:
message
- the message to log
-
logDebug
Log a message at the debugging logging level- Specified by:
logDebug
in interfaceStepManager
- Parameters:
message
- the message to log
-
logWarning
Log a warning message- Specified by:
logWarning
in interfaceStepManager
- Parameters:
message
- the message to log
-
logError
Log an error- Specified by:
logError
in interfaceStepManager
- Parameters:
message
- the message to logcause
- the optional Throwable to log
-
statusMessage
Output a status message to the status area of the log- Specified by:
statusMessage
in interfaceStepManager
- Parameters:
message
- the message to output
-
log
Log a message at the supplied logging level- Specified by:
log
in interfaceStepManager
- Parameters:
message
- the message to writelevel
- the level for the message
-
environmentSubstitute
Substitute the values of environment variables in the given string- Specified by:
environmentSubstitute
in interfaceStepManager
- Parameters:
source
- the source string to substitute in- Returns:
- the source string with all known environment variables resolved
-
getInfoStep
Returns a reference to the step being managed if it has one or more outgoing CON_INFO connections and the managed step is of the supplied class- Specified by:
getInfoStep
in interfaceStepManager
- Parameters:
stepClass
- the expected class of the step- Returns:
- the step being managed if outgoing CON_INFO connections are present and the step is of the supplied class
- Throws:
WekaException
- if there are no outgoing CON_INFO connections or the managed step is the wrong type
-
getInfoStep
Returns a reference to the step being managed if it has one or more outgoing CON_INFO connections.- Specified by:
getInfoStep
in interfaceStepManager
- Returns:
- the step being managed if outgoing CON_INFO connections are present
- Throws:
WekaException
- if there are no outgoing CON_INFO connections
-
findStepInFlow
Finds a named step in the current flow. Returns null if the named step is not present in the flow- Specified by:
findStepInFlow
in interfaceStepManager
- Parameters:
stepNameToFind
- the name of the step to find- Returns:
- the StepManager of the named step, or null if the step does not exist in the current flow.
-
stepStatusMessagePrefix
Gets a prefix for the step managed by this manager. Used to uniquely identify steps in the status area of the log- Returns:
- a unique prefix for the step managed by this manager
-