From d8bf90404e448ac960142aa13adf412ead8b4660 Mon Sep 17 00:00:00 2001 From: Pavel Sakun Date: Mon, 8 Aug 2016 16:49:49 -0400 Subject: [PATCH] PDI-8731 - Cassandra SSTable Output (bulkloader) step does not work with Cassandra 1.1.x Resolve cassandra transitively to get runtime dependencies Add support for cassandra 2.x Split writer into CQL2 and CQL3 specific ones Try preventing JVM from exit in case issues with cassandra config Fixes for CQL2 writer: - read partitioner from config, use Murmur3Partitioner by default - store date as longs [PDI-8731][Tests] - Added IT and made some cosmetic changes --- ivy.xml | 32 +-- .../cassandrasstableoutput/SSTableOutput.java | 169 +++++++++----- .../SSTableOutputDialog.java | 150 +++++++++++-- .../SSTableOutputMeta.java | 109 ++++++--- .../messages/messages_en_US.properties | 11 +- .../writer/AbstractSSTableWriter.java | 105 +++++++++ .../CQL2SSTableWriter.java} | 208 +++++++----------- .../writer/CQL3SSTableWriter.java | 95 ++++++++ .../writer/SSTableWriterBuilder.java | 156 +++++++++++++ .../writer/messages/messages_en_US.properties | 1 + .../SSTableOutputIT.java | 139 ++++++++++++ .../SSTableOutputMetaTest.java | 117 ++++++++++ .../SSTableOutputTest.java | 60 +++++ .../cassandrasstableoutput/cassandra.yaml | 87 ++++++++ .../writer/AbstractSSTableWriterTest.java | 79 +++++++ .../writer/CQL2SSTableWriterTest.java | 193 ++++++++++++++++ .../writer/CQL3SSTableWriterTest.java | 144 ++++++++++++ .../writer/SSTableWriterBuilderTest.java | 128 +++++++++++ .../di/trans/steps/mock/StepMockHelper.java | 156 +++++++++++++ 19 files changed, 1893 insertions(+), 246 deletions(-) create mode 100644 src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/AbstractSSTableWriter.java rename src/org/pentaho/di/trans/steps/cassandrasstableoutput/{SSTableWriter.java => writer/CQL2SSTableWriter.java} (57%) create mode 100644 src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/CQL3SSTableWriter.java create mode 100644 src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/SSTableWriterBuilder.java create mode 100644 src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/messages/messages_en_US.properties create mode 100644 test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputIT.java create mode 100644 test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputMetaTest.java create mode 100644 test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputTest.java create mode 100644 test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/cassandra.yaml create mode 100644 test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/AbstractSSTableWriterTest.java create mode 100644 test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/CQL2SSTableWriterTest.java create mode 100644 test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/CQL3SSTableWriterTest.java create mode 100644 test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/SSTableWriterBuilderTest.java create mode 100644 test-src/org/pentaho/di/trans/steps/mock/StepMockHelper.java diff --git a/ivy.xml b/ivy.xml index 40ce1c7e3..b1bbfd119 100644 --- a/ivy.xml +++ b/ivy.xml @@ -16,32 +16,32 @@ + + + + + + + - - - - - - - - + + + - - - + + + + + - + diff --git a/src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutput.java b/src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutput.java index 1168c1128..dedb2dd7c 100644 --- a/src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutput.java +++ b/src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutput.java @@ -22,11 +22,15 @@ package org.pentaho.di.trans.steps.cassandrasstableoutput; +import java.io.File; +import java.net.URI; +import java.security.Permission; import java.util.HashMap; import java.util.Map; import org.pentaho.di.core.Const; import org.pentaho.di.core.exception.KettleException; +import org.pentaho.di.core.logging.LogChannelInterface; import org.pentaho.di.core.row.RowMetaInterface; import org.pentaho.di.i18n.BaseMessages; import org.pentaho.di.trans.Trans; @@ -36,81 +40,74 @@ import org.pentaho.di.trans.step.StepInterface; import org.pentaho.di.trans.step.StepMeta; import org.pentaho.di.trans.step.StepMetaInterface; +import org.pentaho.di.trans.steps.cassandrasstableoutput.writer.AbstractSSTableWriter; +import org.pentaho.di.trans.steps.cassandrasstableoutput.writer.SSTableWriterBuilder; /** * Output step for writing Cassandra SSTables (sorted-string tables). - * + * * @author Rob Turner (robert{[at]}robertturner{[dot]}com{[dot]}au) * @author Mark Hall (mhall{[at]}pentaho{[dot]}com) */ public class SSTableOutput extends BaseStep implements StepInterface { - - protected SSTableOutputMeta m_meta; - protected SSTableOutputData m_data; - - public SSTableOutput( StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta, - Trans trans ) { - - super( stepMeta, stepDataInterface, copyNr, transMeta, trans ); - } - + private static final SecurityManager sm = System.getSecurityManager(); /** The number of rows seen so far for this batch */ protected int rowsSeen; - - /** The directory to output to */ - protected String directory; - - /** The keyspace to use */ - protected String keyspace; - - /** The name of the column family (table) to write to */ - protected String columnFamily; - - /** The key field used to determine unique keys (IDs) for rows */ - protected String keyField; - - /** Size (MB) of write buffer */ - protected String bufferSize; - /** Writes the SSTable output */ - protected SSTableWriter writer; - + protected AbstractSSTableWriter writer; /** Used to determine input fields */ protected RowMetaInterface inputMetadata; - /** List of field names (optimization) */ private String[] fieldNames; - /** List of field indices (optimization) */ private int[] fieldValueIndices; - private void initialize( StepMetaInterface smi, StepDataInterface sdi ) throws Exception { + public SSTableOutput( StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta, + Trans trans ) { + + super( stepMeta, stepDataInterface, copyNr, transMeta, trans ); + } + + private void initialize( SSTableOutputMeta smi ) throws Exception { first = false; rowsSeen = 0; - m_meta = (SSTableOutputMeta) smi; - m_data = (SSTableOutputData) sdi; inputMetadata = getInputRowMeta(); - String yamlPath = environmentSubstitute( m_meta.getYamlPath() ); + String yamlPath = environmentSubstitute( smi.getYamlPath() ); + String directory = environmentSubstitute( smi.getDirectory() ); + String keyspace = environmentSubstitute( smi.getCassandraKeyspace() ); + String columnFamily = environmentSubstitute( smi.getColumnFamilyName() ); + String keyField = environmentSubstitute( smi.getKeyField() ); + String bufferSize = environmentSubstitute( smi.getBufferSize() ); + if ( Const.isEmpty( yamlPath ) ) { throw new Exception( BaseMessages.getString( SSTableOutputMeta.PKG, "SSTableOutput.Error.NoPathToYAML" ) ); } logBasic( BaseMessages.getString( SSTableOutputMeta.PKG, "SSTableOutput.Message.YAMLPath", yamlPath ) ); - System.setProperty( "cassandra.config", "file:" + yamlPath ); + File outputDir; + if ( Const.isEmpty( directory ) ) { + outputDir = new File( System.getProperty( "java.io.tmpdir" ) ); + } else { + outputDir = new File( new URI( directory ) ); + } + + if ( !outputDir.exists() ) { + if ( !outputDir.mkdirs() ) { + throw new KettleException( BaseMessages.getString( SSTableOutputMeta.PKG, + "SSTableOutput.Error.OutputDirDoesntExist" ) ); + } + } - directory = environmentSubstitute( m_meta.getDirectory() ); - keyspace = environmentSubstitute( m_meta.getCassandraKeyspace() ); - columnFamily = environmentSubstitute( m_meta.getColumnFamilyName() ); - keyField = environmentSubstitute( m_meta.getKeyField() ); - bufferSize = environmentSubstitute( m_meta.getBufferSize() ); if ( Const.isEmpty( columnFamily ) ) { throw new KettleException( BaseMessages.getString( SSTableOutputMeta.PKG, "SSTableOutput.Error.NoColumnFamilySpecified" ) ); } + if ( Const.isEmpty( keyField ) ) { throw new KettleException( BaseMessages.getString( SSTableOutputMeta.PKG, "SSTableOutput.Error.NoKeySpecified" ) ); } + // what are the fields? where are they? fieldNames = inputMetadata.getFieldNames(); fieldValueIndices = new int[fieldNames.length]; @@ -121,13 +118,41 @@ private void initialize( StepMetaInterface smi, StepDataInterface sdi ) throws E if ( writer != null ) { writer.close(); } - writer = new SSTableWriter(); - writer.setDirectory( directory ); - writer.setKeyspace( keyspace ); - writer.setColumnFamily( columnFamily ); - writer.setKeyField( keyField ); - writer.setBufferSize( Integer.parseInt( bufferSize ) ); - writer.init(); + + SSTableWriterBuilder builder = + new SSTableWriterBuilder().withConfig( yamlPath ).withDirectory( outputDir.getAbsolutePath() ).withKeyspace( + keyspace ).withColumnFamily( columnFamily ).withRowMeta( getInputRowMeta() ).withKeyField( keyField ) + .withCqlVersion( smi.getUseCQL3() ? 3 : 2 ); + try { + builder.withBufferSize( Integer.parseInt( bufferSize ) ); + } catch ( NumberFormatException nfe ) { + logBasic( BaseMessages.getString( SSTableOutputMeta.PKG, "SSTableOutput.Message.DefaultBufferSize" ) ); + } + + writer = builder.build(); + try { + disableSystemExit( sm, log ); + writer.init(); + } catch ( Exception e ) { + throw new RuntimeException( BaseMessages.getString( SSTableOutputMeta.PKG, "SSTableOutput.Error.InvalidConfig" ), + e ); + } finally { + // Restore original security manager if needed + if ( System.getSecurityManager() != sm ) { + System.setSecurityManager( sm ); + } + } + } + + void disableSystemExit( SecurityManager sm, LogChannelInterface log) { + // Workaround JVM exit caused by org.apache.cassandra.config.DatabaseDescriptor in case of any issue with + // cassandra config. Do this by preventing JVM from exit for writer initialization time or give user a clue at + // least. + try { + System.setSecurityManager( new NoSystemExitDelegatingSecurityManager( sm ) ); + } catch ( SecurityException se ) { + log.logError( BaseMessages.getString( SSTableOutputMeta.PKG, "SSTableOutput.Error.JVMExitProtection" ), se ); + } } @Override @@ -136,7 +161,18 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws if ( isStopped() ) { return false; } + Object[] r = getRow(); + + if ( first ) { + try { + initialize( (SSTableOutputMeta) smi ); + } catch ( Exception e ) { + throw new KettleException( + BaseMessages.getString( SSTableOutputMeta.PKG, "SSTableOutput.Error.WriterInitFailed" ), e ); + } + } + try { if ( r == null ) { // no more output - clean up/close connections @@ -144,24 +180,23 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws closeWriter(); return false; } - if ( first ) { - initialize( smi, sdi ); - } // create record Map record = new HashMap(); for ( int i = 0; i < fieldNames.length; i++ ) { Object value = r[fieldValueIndices[i]]; - if ( SSTableWriter.isNull( value ) ) { + if ( value == null || "".equals( value ) ) { continue; } record.put( fieldNames[i], value ); } // write it writer.processRow( record ); + incrementLinesWritten(); } catch ( Exception e ) { logError( BaseMessages.getString( SSTableOutputMeta.PKG, "SSTableOutput.Error.FailedToProcessRow" ), e ); // single error row - found it! putError( getInputRowMeta(), r, 1L, e.getMessage(), null, "ERR_SSTABLE_OUTPUT_01" ); + incrementLinesRejected(); } // error will occur after adding it @@ -187,4 +222,34 @@ public void closeWriter() { } } } + + private class JVMShutdownAttemptedException extends SecurityException { + } + + private class NoSystemExitDelegatingSecurityManager extends SecurityManager { + private SecurityManager delegate; + + NoSystemExitDelegatingSecurityManager( SecurityManager delegate ) { + this.delegate = delegate; + } + + @Override + public void checkPermission( Permission perm ) { + if ( delegate != null ) { + delegate.checkPermission( perm ); + } + } + + @Override + public void checkPermission( Permission perm, Object context ) { + if ( delegate != null ) { + delegate.checkPermission( perm, context ); + } + } + + @Override + public void checkExit( int status ) { + throw new JVMShutdownAttemptedException(); + } + } } diff --git a/src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputDialog.java b/src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputDialog.java index c7cb07491..345adb6e5 100644 --- a/src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputDialog.java +++ b/src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputDialog.java @@ -35,6 +35,7 @@ import org.eclipse.swt.layout.FormData; import org.eclipse.swt.layout.FormLayout; import org.eclipse.swt.widgets.Button; +import org.eclipse.swt.widgets.DirectoryDialog; import org.eclipse.swt.widgets.Display; import org.eclipse.swt.widgets.Event; import org.eclipse.swt.widgets.FileDialog; @@ -51,12 +52,15 @@ import org.pentaho.di.trans.step.BaseStepMeta; import org.pentaho.di.trans.step.StepDialogInterface; import org.pentaho.di.trans.step.StepMeta; +import org.pentaho.di.ui.core.dialog.EnterSelectionDialog; import org.pentaho.di.ui.core.widget.TextVar; import org.pentaho.di.ui.trans.step.BaseStepDialog; +import java.io.File; + /** * Dialog class for the SSTableOutput step - * + * * @author Rob Turner (robert{[at]}robertturner{[dot]}com{[dot]}au) * @author Mark Hall (mhall{[at]}pentaho{[dot]}com) */ @@ -93,6 +97,9 @@ public class SSTableOutputDialog extends BaseStepDialog implements StepDialogInt private Button m_getFieldsBut; + private Label m_useCQL3Lab; + private Button m_useCQL3Check; + public SSTableOutputDialog( Shell parent, Object in, TransMeta tr, String name ) { super( parent, (BaseStepMeta) in, tr, name ); @@ -187,9 +194,12 @@ public void widgetSelected( SelectionEvent e ) { filterNames[1] = BaseMessages.getString( PKG, "System.FileType.AllFiles" ); dialog.setFilterExtensions( extensions ); + dialog.setFilterNames( filterNames ); if ( dialog.open() != null ) { - m_yamlText.setText( dialog.getFilterPath() + System.getProperty( "file.separator" ) + dialog.getFileName() ); + String path = dialog.getFilterPath() + System.getProperty( "file.separator" ) + dialog.getFileName(); + path = new File( path ).toURI().toString(); + m_yamlText.setText( path ); } } } ); @@ -229,21 +239,12 @@ public void modifyText( ModifyEvent e ) { m_directoryBut.addSelectionListener( new SelectionAdapter() { @Override public void widgetSelected( SelectionEvent e ) { - FileDialog dialog = new FileDialog( shell, SWT.OPEN ); - String[] extensions = null; - String[] filterNames = null; - - extensions = new String[1]; - filterNames = new String[1]; - - extensions[0] = "*"; - filterNames[0] = BaseMessages.getString( PKG, "System.FileType.AllFiles" ); - - dialog.setFilterExtensions( extensions ); + DirectoryDialog dialog = new DirectoryDialog( shell, SWT.OPEN ); if ( dialog.open() != null ) { - m_directoryText.setText( dialog.getFilterPath() + System.getProperty( "file.separator" ) - + dialog.getFileName() ); + String path = dialog.getFilterPath(); + path = new File( path ).toURI().toString(); + m_directoryText.setText( path ); } } } ); @@ -331,7 +332,11 @@ public void modifyText( ModifyEvent e ) { m_getFieldsBut.addSelectionListener( new SelectionAdapter() { @Override public void widgetSelected( SelectionEvent e ) { - setupFieldsCombo(); + if ( m_useCQL3Check.getSelection() ) { + showEnterSelectionDialog(); + } else { + setupFieldsCombo(); + } } } ); @@ -348,13 +353,44 @@ public void modifyText( ModifyEvent e ) { fd.left = new FormAttachment( middle, 0 ); m_keyFieldCombo.setLayoutData( fd ); + m_useCQL3Lab = new Label( shell, SWT.RIGHT ); + props.setLook( m_useCQL3Lab ); + m_useCQL3Lab.setText( BaseMessages.getString( PKG, "SSTableOutputDialog.UseCQL3.Label" ) ); //$NON-NLS-1$ + fd = new FormData(); + fd.left = new FormAttachment( 0, 0 ); + fd.top = new FormAttachment( m_keyFieldCombo, margin ); + fd.right = new FormAttachment( middle, -margin ); + m_useCQL3Lab.setLayoutData( fd ); + + m_useCQL3Check = new Button( shell, SWT.CHECK ); + props.setLook( m_useCQL3Check ); + fd = new FormData(); + fd.right = new FormAttachment( 100, 0 ); + fd.top = new FormAttachment( m_keyFieldCombo, margin ); + fd.left = new FormAttachment( middle, 0 ); + m_useCQL3Check.setLayoutData( fd ); + + m_useCQL3Check.addSelectionListener( new SelectionAdapter() { + @Override + public void widgetSelected( SelectionEvent e ) { + if ( m_useCQL3Check.getSelection() ) { + m_getFieldsBut.setText( BaseMessages.getString( PKG, "SSTableOutputDialog.SelectFields.Button" ) ); //$NON-NLS-1$ + m_keyFieldLab.setText( BaseMessages.getString( PKG, "SSTableOutputDialog.KeyFields.Label" ) ); //$NON-NLS-1$ + } else { + m_getFieldsBut.setText( " " //$NON-NLS-1$ + + BaseMessages.getString( PKG, "SSTableOutputDialog.GetFields.Button" ) + " " ); //$NON-NLS-1$ //$NON-NLS-2$ + m_keyFieldLab.setText( BaseMessages.getString( PKG, "SSTableOutputDialog.KeyField.Label" ) ); //$NON-NLS-1$ + } + } + } ); + // buffer size m_bufferSizeLab = new Label( shell, SWT.RIGHT ); props.setLook( m_bufferSizeLab ); m_bufferSizeLab.setText( BaseMessages.getString( PKG, "SSTableOutputDialog.BufferSize.Label" ) ); fd = new FormData(); fd.left = new FormAttachment( 0, 0 ); - fd.top = new FormAttachment( m_keyFieldCombo, margin ); + fd.top = new FormAttachment( m_useCQL3Check, margin ); fd.right = new FormAttachment( middle, -margin ); m_bufferSizeLab.setLayoutData( fd ); @@ -368,7 +404,7 @@ public void modifyText( ModifyEvent e ) { m_bufferSizeText.addModifyListener( lsMod ); fd = new FormData(); fd.right = new FormAttachment( 100, 0 ); - fd.top = new FormAttachment( m_keyFieldCombo, margin ); + fd.top = new FormAttachment( m_useCQL3Check, margin ); fd.left = new FormAttachment( middle, 0 ); m_bufferSizeText.setLayoutData( fd ); @@ -470,6 +506,7 @@ protected void ok() { m_currentMeta.setColumnFamilyName( m_columnFamilyText.getText() ); m_currentMeta.setKeyField( m_keyFieldCombo.getText() ); m_currentMeta.setBufferSize( m_bufferSizeText.getText() ); + m_currentMeta.setUseCQL3( m_useCQL3Check.getSelection() ); if ( !m_originalMeta.equals( m_currentMeta ) ) { m_currentMeta.setChanged(); @@ -511,5 +548,82 @@ protected void getData() { if ( !Const.isEmpty( m_currentMeta.getBufferSize() ) ) { m_bufferSizeText.setText( m_currentMeta.getBufferSize() ); } + + m_useCQL3Check.setSelection( m_currentMeta.getUseCQL3() ); + + if ( m_useCQL3Check.getSelection() ) { + m_keyFieldLab.setText( BaseMessages.getString( PKG, "SSTableOutputDialog.KeyFields.Label" ) ); //$NON-NLS-1$ + m_getFieldsBut.setText( BaseMessages.getString( PKG, "SSTableOutputDialog.GetFields.Button" ) ); //$NON-NLS-1$ + } else { + m_keyFieldLab.setText( BaseMessages.getString( PKG, "SSTableOutputDialog.KeyField.Label" ) ); //$NON-NLS-1$ + m_getFieldsBut.setText( BaseMessages.getString( PKG, "SSTableOutputDialog.GetFields.Button" ) ); //$NON-NLS-1$ + } + } + + protected void showEnterSelectionDialog() { + StepMeta stepMeta = transMeta.findStep( stepname ); + + String[] choices = null; + if ( stepMeta != null ) { + try { + RowMetaInterface row = transMeta.getPrevStepFields( stepMeta ); + + if ( row.size() == 0 ) { + MessageDialog.openError( shell, BaseMessages.getString( PKG, + "CassandraOutputData.Message.NoIncomingFields.Title" ), //$NON-NLS-1$ + BaseMessages.getString( PKG, "CassandraOutputData.Message.NoIncomingFields" ) ); //$NON-NLS-1$ + + return; + } + + choices = new String[row.size()]; + for ( int i = 0; i < row.size(); i++ ) { + ValueMetaInterface vm = row.getValueMeta( i ); + choices[i] = vm.getName(); + } + + EnterSelectionDialog dialog = + new EnterSelectionDialog( shell, choices, BaseMessages.getString( PKG, + "CassandraOutputDialog.SelectKeyFieldsDialog.Title" ), //$NON-NLS-1$ + BaseMessages.getString( PKG, "CassandraOutputDialog.SelectKeyFieldsDialog.Message" ) ); //$NON-NLS-1$ + dialog.setMulti( true ); + if ( !Const.isEmpty( m_keyFieldCombo.getText() ) ) { + String current = m_keyFieldCombo.getText(); + String[] parts = current.split( "," ); //$NON-NLS-1$ + int[] currentSelection = new int[parts.length]; + int count = 0; + for ( String s : parts ) { + int index = row.indexOfValue( s.trim() ); + if ( index >= 0 ) { + currentSelection[count++] = index; + } + } + + dialog.setSelectedNrs( currentSelection ); + } + + dialog.open(); + + int[] selected = dialog.getSelectionIndeces(); // SIC + if ( selected != null && selected.length > 0 ) { + StringBuilder newSelection = new StringBuilder(); + boolean first = true; + for ( int i : selected ) { + if ( first ) { + newSelection.append( choices[i] ); + first = false; + } else { + newSelection.append( "," ).append( choices[i] ); //$NON-NLS-1$ + } + } + + m_keyFieldCombo.setText( newSelection.toString() ); + } + } catch ( KettleException ex ) { + MessageDialog.openError( shell, BaseMessages.getString( PKG, + "CassandraOutputData.Message.NoIncomingFields.Title" ), BaseMessages //$NON-NLS-1$ + .getString( PKG, "CassandraOutputData.Message.NoIncomingFields" ) ); //$NON-NLS-1$ + } + } } } diff --git a/src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputMeta.java b/src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputMeta.java index 0d31b255e..30fafe7ff 100644 --- a/src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputMeta.java +++ b/src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputMeta.java @@ -19,7 +19,6 @@ * limitations under the License. * ******************************************************************************/ - package org.pentaho.di.trans.steps.cassandrasstableoutput; import java.util.List; @@ -86,6 +85,11 @@ public class SSTableOutputMeta extends BaseStepMeta implements StepMetaInterface @Injection( name = "BUFFER_SIZE" ) protected String bufferSize = "16"; + /** + * Whether to use CQL version 3 + */ + protected boolean m_useCQL3 = false; + /** * Get the path the the yaml file * @@ -200,6 +204,24 @@ public void setBufferSize( String bufferSize ) { this.bufferSize = bufferSize; } + /** + * Set whether to use CQL version 3 is to be used for CQL IO mode + * + * @param cql3 true if CQL version 3 is to be used + */ + public void setUseCQL3( boolean cql3 ) { + m_useCQL3 = cql3; + } + + /** + * Get whether to use CQL version 3 is to be used for CQL IO mode + * + * @return true if CQL version 3 is to be used + */ + public boolean getUseCQL3() { + return m_useCQL3; + } + @Override public boolean supportsErrorHandling() { // enable define error handling option @@ -211,72 +233,89 @@ public String getXML() { StringBuffer retval = new StringBuffer(); if ( !Const.isEmpty( m_yamlPath ) ) { - retval.append( "\n " ).append( XMLHandler.addTagValue( "yaml_path", m_yamlPath ) ); + retval.append( "\n " ).append( + XMLHandler.addTagValue( "yaml_path", m_yamlPath ) ); } if ( !Const.isEmpty( directory ) ) { - retval.append( "\n " ).append( XMLHandler.addTagValue( "output_directory", directory ) ); - } - - if ( !Const.isEmpty( cassandraKeyspace ) ) { - retval.append( "\n " ).append( XMLHandler.addTagValue( "cassandra_keyspace", cassandraKeyspace ) ); + retval.append( "\n " ).append( + XMLHandler.addTagValue( "output_directory", directory ) ); } if ( !Const.isEmpty( cassandraKeyspace ) ) { - retval.append( "\n " ).append( XMLHandler.addTagValue( "cassandra_keyspace", cassandraKeyspace ) ); + retval.append( "\n " ).append( + XMLHandler.addTagValue( "cassandra_keyspace", cassandraKeyspace ) ); } if ( !Const.isEmpty( columnFamily ) ) { - retval.append( "\n " ).append( XMLHandler.addTagValue( "column_family", columnFamily ) ); + retval.append( "\n " ).append( + XMLHandler.addTagValue( "column_family", columnFamily ) ); } if ( !Const.isEmpty( keyField ) ) { - retval.append( "\n " ).append( XMLHandler.addTagValue( "key_field", keyField ) ); + retval.append( "\n " ).append( + XMLHandler.addTagValue( "key_field", keyField ) ); } if ( !Const.isEmpty( bufferSize ) ) { - retval.append( "\n " ).append( XMLHandler.addTagValue( "buffer_size_mb", bufferSize ) ); + retval.append( "\n " ).append( + XMLHandler.addTagValue( "buffer_size_mb", bufferSize ) ); } + retval.append( "\n " ).append( //$NON-NLS-1$ + XMLHandler.addTagValue( "use_cql3", m_useCQL3 ) ); //$NON-NLS-1$ + return retval.toString(); } - public void loadXML( Node stepnode, List databases, Map counters ) - throws KettleXMLException { + public void loadXML( Node stepnode, List databases, + Map counters ) throws KettleXMLException { m_yamlPath = XMLHandler.getTagValue( stepnode, "yaml_path" ); directory = XMLHandler.getTagValue( stepnode, "output_directory" ); cassandraKeyspace = XMLHandler.getTagValue( stepnode, "cassandra_keyspace" ); columnFamily = XMLHandler.getTagValue( stepnode, "column_family" ); keyField = XMLHandler.getTagValue( stepnode, "key_field" ); bufferSize = XMLHandler.getTagValue( stepnode, "buffer_size_mb" ); + + String useCQL3 = XMLHandler.getTagValue( stepnode, "use_cql3" ); //$NON-NLS-1$ + if ( !Const.isEmpty( useCQL3 ) ) { + m_useCQL3 = useCQL3.equalsIgnoreCase( "Y" ); //$NON-NLS-1$ + } } - public void readRep( Repository rep, ObjectId id_step, List databases, Map counters ) + public void readRep( Repository rep, ObjectId id_step, + List databases, Map counters ) throws KettleException { m_yamlPath = rep.getStepAttributeString( id_step, 0, "yaml_path" ); directory = rep.getStepAttributeString( id_step, 0, "output_directory" ); - cassandraKeyspace = rep.getStepAttributeString( id_step, 0, "cassandra_keyspace" ); + cassandraKeyspace = rep.getStepAttributeString( id_step, 0, + "cassandra_keyspace" ); columnFamily = rep.getStepAttributeString( id_step, 0, "column_family" ); keyField = rep.getStepAttributeString( id_step, 0, "key_field" ); bufferSize = rep.getStepAttributeString( id_step, 0, "buffer_size_mb" ); + m_useCQL3 = rep.getStepAttributeBoolean( id_step, 0, "use_cql3" ); //$NON-NLS-1$ } - public void saveRep( Repository rep, ObjectId id_transformation, ObjectId id_step ) throws KettleException { + public void saveRep( Repository rep, ObjectId id_transformation, + ObjectId id_step ) throws KettleException { if ( !Const.isEmpty( m_yamlPath ) ) { rep.saveStepAttribute( id_transformation, id_step, "yaml_path", m_yamlPath ); } if ( !Const.isEmpty( directory ) ) { - rep.saveStepAttribute( id_transformation, id_step, "output_directory", directory ); + rep.saveStepAttribute( id_transformation, id_step, "output_directory", + directory ); } if ( !Const.isEmpty( cassandraKeyspace ) ) { - rep.saveStepAttribute( id_transformation, id_step, "cassandra_keyspace", cassandraKeyspace ); + rep.saveStepAttribute( id_transformation, id_step, "cassandra_keyspace", + cassandraKeyspace ); } if ( !Const.isEmpty( columnFamily ) ) { - rep.saveStepAttribute( id_transformation, id_step, "column_family", columnFamily ); + rep.saveStepAttribute( id_transformation, id_step, "column_family", + columnFamily ); } if ( !Const.isEmpty( keyField ) ) { @@ -284,40 +323,48 @@ public void saveRep( Repository rep, ObjectId id_transformation, ObjectId id_ste } if ( !Const.isEmpty( bufferSize ) ) { - rep.saveStepAttribute( id_transformation, id_step, "buffer_size_mb", bufferSize ); + rep.saveStepAttribute( id_transformation, id_step, "buffer_size_mb", + bufferSize ); } + rep.saveStepAttribute( id_transformation, id_step, 0, "use_cql3", m_useCQL3 ); //$NON-NLS-1$ } - public void check( List remarks, TransMeta transMeta, StepMeta stepMeta, RowMetaInterface prev, - String[] input, String[] output, RowMetaInterface info ) { + public void check( List remarks, TransMeta transMeta, + StepMeta stepMeta, RowMetaInterface prev, String[] input, + String[] output, RowMetaInterface info ) { CheckResult cr; if ( ( prev == null ) || ( prev.size() == 0 ) ) { - cr = new CheckResult( CheckResult.TYPE_RESULT_WARNING, "Not receiving any fields from previous steps!", stepMeta ); + cr = new CheckResult( CheckResult.TYPE_RESULT_WARNING, + "Not receiving any fields from previous steps!", stepMeta ); remarks.add( cr ); } else { - cr = - new CheckResult( CheckResult.TYPE_RESULT_OK, "Step is connected to previous one, receiving " + prev.size() - + " fields", stepMeta ); + cr = new CheckResult( CheckResult.TYPE_RESULT_OK, + "Step is connected to previous one, receiving " + prev.size() + + " fields", stepMeta ); remarks.add( cr ); } // See if we have input streams leading to this step! if ( input.length > 0 ) { - cr = new CheckResult( CheckResult.TYPE_RESULT_OK, "Step is receiving info from other steps.", stepMeta ); + cr = new CheckResult( CheckResult.TYPE_RESULT_OK, + "Step is receiving info from other steps.", stepMeta ); remarks.add( cr ); } else { - cr = new CheckResult( CheckResult.TYPE_RESULT_ERROR, "No input received from other steps!", stepMeta ); + cr = new CheckResult( CheckResult.TYPE_RESULT_ERROR, + "No input received from other steps!", stepMeta ); remarks.add( cr ); } } - public StepInterface getStep( StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, - TransMeta transMeta, Trans trans ) { + public StepInterface getStep( StepMeta stepMeta, + StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta, + Trans trans ) { - return new SSTableOutput( stepMeta, stepDataInterface, copyNr, transMeta, trans ); + return new SSTableOutput( stepMeta, stepDataInterface, copyNr, transMeta, + trans ); } public StepDataInterface getStepData() { diff --git a/src/org/pentaho/di/trans/steps/cassandrasstableoutput/messages/messages_en_US.properties b/src/org/pentaho/di/trans/steps/cassandrasstableoutput/messages/messages_en_US.properties index b5d7e4b9d..74962cdbc 100644 --- a/src/org/pentaho/di/trans/steps/cassandrasstableoutput/messages/messages_en_US.properties +++ b/src/org/pentaho/di/trans/steps/cassandrasstableoutput/messages/messages_en_US.properties @@ -9,8 +9,11 @@ SSTableOutputDialog.YAML.Button=Browse SSTableOutputDialog.Keyspace.Label=Keyspace SSTableOutputDialog.ColumnFamily.Label=Column family (table) SSTableOutputDialog.KeyField.Label=Incoming field to use as the key +SSTableOutputDialog.KeyFields.Label=Incoming fields to use as the key SSTableOutputDialog.GetFields.Button=Get fields +SSTableOutputDialog.SelectFields.Button=Select fields SSTableOutputDialog.BufferSize.Label=Buffer (MB) +SSTableOutputDialog.UseCQL3.Label=Use CQL version 3 SSTableOutputData.Message.NoIncomingFields=There doesn't seem to be any fields coming into this step SSTableOutputData.Message.NoIncomingFields.Title=No incoming fields available @@ -20,12 +23,18 @@ SSTableOutput.Error.FailedToProcessRow=Failed to process row SSTableOutput.Error.FailedToCloseWriter=Failed to close writer SSTableOutput.Error.NoPathToYAML=No path to the yaml file has been specified! SSTableOutput.Error.NoColumnFamilySpecified=No column family (table) has been specified! +SSTableOutput.Error.OutputDirDoesntExist=Output dir doesn't exist and cannot be created +SSTableOutput.Error.JVMExitProtection=Could not setup protection from JVM exit caused by invalid cassandra config. +SSTableOutput.Error.InvalidConfig=Cassandra config is invalid. Check log for details. +SSTableOutput.Error.WriterInitFailed=Writer initialization failed. SSTableOutput.Message.YAMLPath=Using yaml file: {0} +SSTableOutput.Message.DefaultBufferSize=Cannot determine buffer size to use, default settings will be used SSTableOutput.Injection.YAML_FILE_PATH=The path to the Cassandra YAML file. SSTableOutput.Injection.DIRECTORY=The directory to write output to. SSTableOutput.Injection.CASSANDRA_KEYSPACE=The Cassandra keyspace to use. SSTableOutput.Injection.COLUMN_FAMILY=The column family (table) to write to. SSTableOutput.Injection.KEY_FIELD=The incoming field to use as the key for inserts. -SSTableOutput.Injection.BUFFER_SIZE=The size of the write buffer specified in megabytes. \ No newline at end of file +SSTableOutput.Injection.BUFFER_SIZE=The size of the write buffer specified in megabytes. + diff --git a/src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/AbstractSSTableWriter.java b/src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/AbstractSSTableWriter.java new file mode 100644 index 000000000..1240740e5 --- /dev/null +++ b/src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/AbstractSSTableWriter.java @@ -0,0 +1,105 @@ +/*! ****************************************************************************** + * + * Pentaho Data Integration + * + * Copyright (C) 2002-2014 by Pentaho : http://www.pentaho.com + * + ******************************************************************************* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package org.pentaho.di.trans.steps.cassandrasstableoutput.writer; + +import java.util.Map; + +public abstract class AbstractSSTableWriter { + private static final int DEFAULT_BUFFER_SIZE_MB = 16; + private int bufferSize = DEFAULT_BUFFER_SIZE_MB; + private String directory = System.getProperty( "java.io.tmpdir" ); + private String keyspace; + private String columnFamily; + private String keyField; + + public abstract void init() throws Exception; + + public abstract void processRow( Map record ) throws Exception; + + public abstract void close() throws Exception; + + protected String getDirectory() { + return directory; + } + + /** + * Set the directory to read the sstables from + * + * @param directory + * the directory to read the sstables from + */ + public void setDirectory( String directory ) { + this.directory = directory; + } + + protected String getKeyspace() { + return keyspace; + } + + /** + * Set the target keyspace + * + * @param keyspace + * the keyspace to use + */ + public void setKeyspace( String keyspace ) { + this.keyspace = keyspace; + } + + protected String getColumnFamily() { + return columnFamily; + } + + /** + * Set the column family (table) to load to. Note: it is assumed that this column family exists in the keyspace + * apriori. + * + * @param columnFamily + * the column family to load to. + */ + public void setColumnFamily( String columnFamily ) { + this.columnFamily = columnFamily; + } + + protected int getBufferSize() { + return bufferSize; + } + + /** + * Set the buffer size (Mb) to use. A new table file is written every time the buffer is full. + * + * @param bufferSize + * the size of the buffer to use + */ + public void setBufferSize( int bufferSize ) { + this.bufferSize = bufferSize; + } + + protected String getKeyField() { + return keyField; + } + + public void setKeyField( String keyField ) { + this.keyField = keyField; + } +} diff --git a/src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableWriter.java b/src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/CQL2SSTableWriter.java similarity index 57% rename from src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableWriter.java rename to src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/CQL2SSTableWriter.java index c874e9adc..e24498b63 100644 --- a/src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableWriter.java +++ b/src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/CQL2SSTableWriter.java @@ -20,187 +20,162 @@ * ******************************************************************************/ -package org.pentaho.di.trans.steps.cassandrasstableoutput; +package org.pentaho.di.trans.steps.cassandrasstableoutput.writer; /* * Adapted from DataStax DataImportExample * http://www.datastax.com/wp-content/uploads/2011/08/DataImportExample.java - * + * * Original Disclaimer: * This file is an example on how to use the Cassandra SSTableSimpleUnsortedWriter class to create * sstables from a csv input file. * While this has been tested to work, this program is provided "as is" with no guarantee. Moreover, * it's primary aim is toward simplicity rather than completness. In partical, don't use this as an * example to parse csv files at home. - * + * */ -import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.pentaho.di.core.Const; +import org.pentaho.di.core.exception.KettleException; +import org.pentaho.di.i18n.BaseMessages; import java.io.File; import java.nio.ByteBuffer; -import java.text.DateFormat; import java.util.Date; import java.util.Map; import java.util.Map.Entry; -import org.apache.cassandra.db.marshal.AsciiType; -import org.apache.cassandra.dht.RandomPartitioner; -import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter; -import org.apache.log4j.helpers.ISO8601DateFormat; -import org.pentaho.di.core.exception.KettleException; +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; /** * Outputs Cassandra SSTables (sorted-string tables) to a directory. - * + * * Adapted from DataStax DataImportExample http://www.datastax.com/wp-content/uploads/2011/08/DataImportExample.java - * + * * @author Rob Turner (robert{[at]}robertturner{[dot]}com{[dot]}au) */ -public class SSTableWriter { - - private static final DateFormat ISO8601 = ISO8601DateFormat.getInstance(); - private static final int DEFAULT_BUFFER_SIZE_MB = 16; - - private String directory = System.getProperty( "java.io.tmpdir" ); - private String keyspace; - private String columnFamily; +class CQL2SSTableWriter extends AbstractSSTableWriter { private String keyField; - private int bufferSize = DEFAULT_BUFFER_SIZE_MB; - + private String partitionerClassName; private SSTableSimpleUnsortedWriter writer; - - /** - * Set the directory to read the sstables from - * - * @param directory - * the directory to read the sstables from - */ - public void setDirectory( String directory ) { - this.directory = directory; - } - - /** - * Set the target keyspace - * - * @param keyspace - * the keyspace to use - */ - public void setKeyspace( String keyspace ) { - this.keyspace = keyspace; - } - - /** - * Set the column family (table) to load to. Note: it is assumed that this column family exists in the keyspace - * apriori. - * - * @param columnFamily - * the column family to load to. - */ - public void setColumnFamily( String columnFamily ) { - this.columnFamily = columnFamily; - } - - /** - * Set the key field name - * - * @param keyField - * the key field name - */ - public void setKeyField( String keyField ) { - this.keyField = keyField; - } - - /** - * Set the buffer size (Mb) to use. A new table file is written every time the buffer is full. - * - * @param bufferSize - * the size of the buffer to use - */ - public void setBufferSize( int bufferSize ) { - this.bufferSize = bufferSize; - } + protected static final Class PKG = CQL2SSTableWriter.class; /** * Initialization. Creates target directory if needed and establishes the writer - * + * * @throws Exception * if a problem occurs */ public void init() throws Exception { - File directory = new File( this.directory ); - - if ( !directory.exists() ) { - directory.mkdir(); - } try { + Class partitionerClass = Murmur3Partitioner.class; + if ( !Const.isEmpty( partitionerClassName ) ) { + try { + partitionerClass = Class.forName( partitionerClassName ); + } catch ( ClassNotFoundException cnfe ) { + // Use default partitioner + } + } + writer = - new SSTableSimpleUnsortedWriter( directory, new RandomPartitioner(), keyspace, columnFamily, - AsciiType.instance, null, bufferSize ); + getSsTableSimpleUnsortedWriter( new File( getDirectory() ), partitionerClass, getKeyspace(), getColumnFamily(), + getBufferSize() ); } catch ( Throwable t ) { - throw new KettleException( "Failed to create SSTableSimpleUnsortedWriter", t ); + throw new KettleException( BaseMessages.getString( PKG, "SSTableOutput.Error.WriterCreation" ), t ); } } + SSTableSimpleUnsortedWriter getSsTableSimpleUnsortedWriter( File file, Class partitionerClass, String keyspace, + String columnFamily, int bufferSize ) + throws InstantiationException, IllegalAccessException { + return new SSTableSimpleUnsortedWriter( file, (IPartitioner) partitionerClass.newInstance(), + keyspace, columnFamily, AsciiType.instance, null, bufferSize ); + } + /** * Process a row of data - * + * * @param record * a row of data as a Map of column names to values * @throws Exception * if a problem occurs */ public void processRow( Map record ) throws Exception { - // get UUID ByteBuffer uuid = valueToBytes( record.get( keyField ) ); // write record writer.newRow( uuid ); long timestamp = System.currentTimeMillis() * 1000; for ( Entry entry : record.entrySet() ) { - // get value - Object value = entry.getValue(); - if ( isNull( value ) ) { - continue; - } - // don't write the key as a column! - if ( entry.getKey().equals( keyField ) ) { - continue; + if ( !entry.getKey().equals( keyField ) ) { + writer.addColumn( bytes( entry.getKey() ), valueToBytes( entry.getValue() ), timestamp ); } + } + } - // write - writer.addColumn( bytes( entry.getKey() ), valueToBytes( value ), timestamp ); + /** + * Close the writer + * + * @throws Exception + * if a problem occurs + */ + public void close() throws Exception { + if ( writer != null ) { + writer.close(); } } - private static final ByteBuffer valueToBytes( Object val ) throws Exception { + /** + * Set the key field name + * + * @param keyField + * the key field name + */ + public void setKeyField( String keyField ) { + this.keyField = keyField; + } + + /** + * Set paritioner class name to be used. + * + * @param partitionerClassName + * class name + */ + public void setPartitionerClassName( String partitionerClassName ) { + this.partitionerClassName = partitionerClassName; + } + + static ByteBuffer valueToBytes( Object val ) throws Exception { + if ( val == null ) { + return ByteBufferUtil.EMPTY_BYTE_BUFFER; + } if ( val instanceof String ) { return bytes( (String) val ); } if ( val instanceof Integer ) { - return bytes( ( (Integer) val ).intValue() ); + return bytes( (Integer) val ); } if ( val instanceof Float ) { - return bytes( ( (Float) val ).floatValue() ); + return bytes( (Float) val ); } if ( val instanceof Boolean ) { // will return "true" or "false" return bytes( val.toString() ); } if ( val instanceof Date ) { - // use ISO 8601 date format - try { - return bytes( ISO8601.format( (Date) val ) ); - } catch ( ArrayIndexOutOfBoundsException e ) { - // something wrong with the date... just convert to string - return bytes( val.toString() ); - } + return bytes( ( (Date) val ).getTime() ); } if ( val instanceof Long ) { return bytes( ( (Long) val ).longValue() ); } if ( val instanceof Double ) { - return bytes( ( (Double) val ).doubleValue() ); + return bytes( (Double) val ); } if ( val instanceof byte[] ) { @@ -210,27 +185,4 @@ private static final ByteBuffer valueToBytes( Object val ) throws Exception { // reduce to string return bytes( val.toString() ); } - - static final boolean isNull( Object val ) { - if ( val == null ) { - return true; - } - // empty strings are considered null in this context - if ( val instanceof String ) { - return "".equals( val ); - } - return false; - } - - /** - * Close the writer - * - * @throws Exception - * if a problem occurs - */ - public void close() throws Exception { - if ( writer != null ) { - writer.close(); - } - } } diff --git a/src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/CQL3SSTableWriter.java b/src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/CQL3SSTableWriter.java new file mode 100644 index 000000000..4ea0ff488 --- /dev/null +++ b/src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/CQL3SSTableWriter.java @@ -0,0 +1,95 @@ +/*! ****************************************************************************** + * + * Pentaho Data Integration + * + * Copyright (C) 2002-2014 by Pentaho : http://www.pentaho.com + * + ******************************************************************************* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package org.pentaho.di.trans.steps.cassandrasstableoutput.writer; + +import java.util.Arrays; +import java.util.Map; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.io.sstable.CQLSSTableWriter; +import org.pentaho.cassandra.CassandraUtils; +import org.pentaho.di.core.row.RowMetaInterface; +import org.pentaho.di.core.row.ValueMetaInterface; + +import com.google.common.base.Joiner; + +class CQL3SSTableWriter extends AbstractSSTableWriter { + private CQLSSTableWriter writer; + private RowMetaInterface rowMeta; + + @Override + public void init() throws Exception { + //Allow column family to be reloaded + purgeSchemaInstance(); + writer = getCQLSSTableWriter(); + } + + void purgeSchemaInstance() { + Schema.instance.purge( new CFMetaData( getKeyspace(), getColumnFamily(), null, null ) ); + } + + CQLSSTableWriter getCQLSSTableWriter() { + return CQLSSTableWriter.builder().inDirectory( getDirectory() ).forTable( buildCreateColumnFamilyCQLStatement() ) + .using( buildInsertCQLStatement() ).withBufferSizeInMB( getBufferSize() ).build(); + } + + @Override + public void processRow( Map record ) throws Exception { + writer.addRow( record ); + } + + @Override + public void close() throws Exception { + if ( writer != null ) { + writer.close(); + } + } + + public void setRowMeta( RowMetaInterface rowMeta ) { + this.rowMeta = rowMeta; + } + + String buildCreateColumnFamilyCQLStatement() { + StringBuilder tableColumnsSpecification = new StringBuilder(); + for ( ValueMetaInterface valueMeta : rowMeta.getValueMetaList() ) { + tableColumnsSpecification.append( CassandraUtils.cql3MixedCaseQuote( valueMeta.getName() ) ).append( " " ) + .append( CassandraUtils.getCQLTypeForValueMeta( valueMeta ) ).append( "," ); + } + + tableColumnsSpecification.append( "PRIMARY KEY (\"" ).append( getKeyField().replaceAll( ",", "\",\"" ) ).append( + "\")" ); + + return String.format( "CREATE TABLE %s.%s (%s);", getKeyspace(), getColumnFamily(), tableColumnsSpecification ); + } + + String buildInsertCQLStatement() { + Joiner columnsJoiner = Joiner.on( "\",\"" ).skipNulls(); + Joiner valuesJoiner = Joiner.on( "," ).skipNulls(); + String[] columnNames = rowMeta.getFieldNames(); + String[] valuePlaceholders = new String[columnNames.length]; + Arrays.fill( valuePlaceholders, "?" ); + return String.format( "INSERT INTO %s.%s (\"%s\") VALUES (%s);", getKeyspace(), getColumnFamily(), columnsJoiner + .join( columnNames ), valuesJoiner.join( valuePlaceholders ) ); + } +} diff --git a/src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/SSTableWriterBuilder.java b/src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/SSTableWriterBuilder.java new file mode 100644 index 000000000..9e0d942fe --- /dev/null +++ b/src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/SSTableWriterBuilder.java @@ -0,0 +1,156 @@ +/*! ****************************************************************************** + * + * Pentaho Data Integration + * + * Copyright (C) 2002-2014 by Pentaho : http://www.pentaho.com + * + ******************************************************************************* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package org.pentaho.di.trans.steps.cassandrasstableoutput.writer; + +import org.apache.cassandra.config.YamlConfigurationLoader; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.pentaho.di.core.row.RowMetaInterface; + +/** + * Builder is used to create specific SSTableWriter depending mostly on CQL version + * + * @author Pavel Sakun + */ +public class SSTableWriterBuilder { + /** + * Path to cassandra YAML config + */ + private String configFilePath; + + /** + * CQL Version + */ + private int cqlVersion; + + /** + * The directory to output to + */ + private String directory; + + /** + * The keyspace to use + */ + private String keyspace; + + /** + * The name of the column family (table) to write to + */ + private String columnFamily; + + /** + * The key field used to determine unique keys (IDs) for rows + */ + private String keyField; + + /** + * Size (MB) of write buffer + */ + private int bufferSize; + + /** + * Input row meta + */ + private RowMetaInterface rowMeta; + + public SSTableWriterBuilder withConfig( String configFilePath ) { + if ( !configFilePath.startsWith( "file:" ) ) { + this.configFilePath = "file:" + configFilePath; + } else { + this.configFilePath = configFilePath; + } + return this; + } + + public SSTableWriterBuilder withDirectory( String outputDirectoryPath ) { + this.directory = outputDirectoryPath; + return this; + } + + public SSTableWriterBuilder withKeyspace( String keyspaceName ) { + this.keyspace = keyspaceName; + return this; + } + + public SSTableWriterBuilder withColumnFamily( String columnFamilyName ) { + this.columnFamily = columnFamilyName; + return this; + } + + public SSTableWriterBuilder withKeyField( String keyField ) { + this.keyField = keyField; + return this; + } + + public SSTableWriterBuilder withBufferSize( int bufferSize ) { + this.bufferSize = bufferSize; + return this; + } + + public SSTableWriterBuilder withRowMeta( RowMetaInterface rowMeta ) { + this.rowMeta = rowMeta; + return this; + } + + public SSTableWriterBuilder withCqlVersion( int cqlVersion ) { + this.cqlVersion = cqlVersion; + return this; + } + + public AbstractSSTableWriter build() throws Exception { + System.setProperty( "cassandra.config", configFilePath ); + AbstractSSTableWriter result; + + if ( cqlVersion == 3 ) { + CQL3SSTableWriter writer = getCql3SSTableWriter(); + + writer.setRowMeta( rowMeta ); + + result = writer; + } else { + CQL2SSTableWriter writer = getCql2SSTableWriter(); + + writer.setPartitionerClassName( getPartitionerClass() ); + + result = writer; + } + result.setDirectory( directory ); + result.setKeyspace( keyspace ); + result.setColumnFamily( columnFamily ); + result.setKeyField( keyField ); + result.setBufferSize( bufferSize ); + + return result; + } + + String getPartitionerClass() throws ConfigurationException { + return new YamlConfigurationLoader().loadConfig().partitioner; + } + + CQL2SSTableWriter getCql2SSTableWriter() { + return new CQL2SSTableWriter(); + } + + CQL3SSTableWriter getCql3SSTableWriter() { + return new CQL3SSTableWriter(); + } +} diff --git a/src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/messages/messages_en_US.properties b/src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/messages/messages_en_US.properties new file mode 100644 index 000000000..edcac809d --- /dev/null +++ b/src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/messages/messages_en_US.properties @@ -0,0 +1 @@ +CQL2SSTableWriter.Error.WriterCreation=Failed to create SSTableSimpleUnsortedWriter \ No newline at end of file diff --git a/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputIT.java b/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputIT.java new file mode 100644 index 000000000..d4430f521 --- /dev/null +++ b/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputIT.java @@ -0,0 +1,139 @@ +/*! + * Copyright 2015 Pentaho Corporation. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.pentaho.di.trans.steps.cassandrasstableoutput; + +import junit.framework.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.pentaho.di.core.RowSet; +import org.pentaho.di.core.exception.KettleException; +import org.pentaho.di.core.logging.LoggingObjectInterface; +import org.pentaho.di.core.row.RowMetaInterface; +import org.pentaho.di.core.row.ValueMetaInterface; +import org.pentaho.di.core.row.value.ValueMetaBase; +import org.pentaho.di.trans.steps.mock.StepMockHelper; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + +public class SSTableOutputIT { + private static StepMockHelper helper; + private static AtomicInteger i; + + @BeforeClass + public static void setUp() throws KettleException { + //KettleEnvironment.init(); + helper = + new StepMockHelper( "SSTableOutputIT", SSTableOutputMeta.class, + SSTableOutputData.class ); + when( helper.logChannelInterfaceFactory.create( any(), any( LoggingObjectInterface.class ) ) ).thenReturn( + helper.logChannelInterface ); + when( helper.trans.isRunning() ).thenReturn( true ); + } + + @Test + public void testCQLS2SSTableWriter() throws Exception { + SSTableOutput ssTableOutput = + new SSTableOutput( helper.stepMeta, helper.stepDataInterface, 0, helper.transMeta, helper.trans ); + ValueMetaInterface one = new ValueMetaBase( "key", ValueMetaBase.TYPE_INTEGER ); + ValueMetaInterface two = new ValueMetaBase( "two", ValueMetaBase.TYPE_STRING ); + List valueMetaList = new ArrayList( ); + valueMetaList.add( one ); + valueMetaList.add( two ); + String[] fieldNames = new String[] { "key", "two" }; + RowMetaInterface inputRowMeta = mock( RowMetaInterface.class ); + when( inputRowMeta.clone() ).thenReturn( inputRowMeta ); + when( inputRowMeta.size() ).thenReturn( 2 ); + when( inputRowMeta.getFieldNames() ).thenReturn( fieldNames ); + when( inputRowMeta.getValueMetaList() ).thenReturn( valueMetaList ); + RowSet rowset = helper.getMockInputRowSet( new Object[] { 1, "some" } ); + when( rowset.getRowMeta() ).thenReturn( inputRowMeta ); + ssTableOutput.getInputRowSets().add( rowset ); + SSTableOutputMeta meta = createStepMeta( false ); + ssTableOutput.init( meta, helper.initStepDataInterface ); + ssTableOutput.processRow( meta, helper.processRowsStepDataInterface ); + Assert.assertEquals( "Step init error.", 0, ssTableOutput.getErrors() ); + assertEquals( "org.pentaho.di.trans.steps.cassandrasstableoutput.writer.CQL2SSTableWriter", + ssTableOutput.writer.getClass().getName() ); + ssTableOutput.dispose( meta, helper.initStepDataInterface ); + Assert.assertEquals( "Step dispose error", 0, ssTableOutput.getErrors() ); + } + + @Test + public void testCQLS3SSTableWriter() throws Exception { + SSTableOutput ssTableOutput = + new SSTableOutput( helper.stepMeta, helper.stepDataInterface, 0, helper.transMeta, helper.trans ); + i = new AtomicInteger( 0 ); + ValueMetaInterface one = new ValueMetaBase( "key", ValueMetaBase.TYPE_INTEGER ); + ValueMetaInterface two = new ValueMetaBase( "two", ValueMetaBase.TYPE_STRING ); + List valueMetaList = new ArrayList( ); + valueMetaList.add( one ); + valueMetaList.add( two ); + String[] fieldNames = new String[] { "key", "two" }; + RowMetaInterface inputRowMeta = mock( RowMetaInterface.class ); + when( inputRowMeta.clone() ).thenReturn( inputRowMeta ); + when( inputRowMeta.size() ).thenReturn( 2 ); + when( inputRowMeta.getFieldNames() ).thenReturn( fieldNames ); + when( inputRowMeta.getValueMetaList() ).thenReturn( valueMetaList ); + when( inputRowMeta.indexOfValue( anyString() ) ).thenAnswer( new Answer() { + @Override public Integer answer( InvocationOnMock invocation ) throws Throwable { + return i.getAndIncrement(); + } + } ); + RowSet rowset = helper.getMockInputRowSet( new Object[] { 1L, "some" } ); + when( rowset.getRowMeta() ).thenReturn( inputRowMeta ); + ssTableOutput.getInputRowSets().add( rowset ); + SSTableOutputMeta meta = createStepMeta( true ); + ssTableOutput.init( meta, helper.initStepDataInterface ); + ssTableOutput.processRow( meta, helper.processRowsStepDataInterface ); + Assert.assertEquals( "Step init error.", 0, ssTableOutput.getErrors() ); + assertEquals( "org.pentaho.di.trans.steps.cassandrasstableoutput.writer.CQL3SSTableWriter", + ssTableOutput.writer.getClass().getName() ); + ssTableOutput.dispose( meta, helper.initStepDataInterface ); + Assert.assertEquals( "Step dispose error", 0, ssTableOutput.getErrors() ); + } + + private SSTableOutputMeta createStepMeta( Boolean v3 ) throws IOException { + File tempFile = File.createTempFile( getClass().getName(), ".tmp" ); + tempFile.deleteOnExit(); + + final SSTableOutputMeta meta = new SSTableOutputMeta(); + meta.setBufferSize( "1000" ); + meta.setDirectory( tempFile.getParentFile().toURI().toString() ); + meta.setCassandraKeyspace( "key" ); + meta.setYamlPath( getClass().getResource( "cassandra.yaml" ).getFile() ); + meta.setColumnFamilyName( "cfq" ); + if ( v3 ) { + meta.setKeyField( "key,two" ); + } else { + meta.setKeyField( "key" ); + + } + meta.setUseCQL3( v3 ); + + return meta; + } +} diff --git a/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputMetaTest.java b/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputMetaTest.java new file mode 100644 index 000000000..22a551465 --- /dev/null +++ b/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputMetaTest.java @@ -0,0 +1,117 @@ +/*! + * Copyright 2015 Pentaho Corporation. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.pentaho.di.trans.steps.cassandrasstableoutput; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class SSTableOutputMetaTest { + + @Test + public void testGetXMLUseCQL3() throws Exception { + SSTableOutputMeta ssTableOutputMeta = new SSTableOutputMeta(); + ssTableOutputMeta.setDefault(); + ssTableOutputMeta.setUseCQL3( true ); + assertTrue( "getXml() does not cover setUseCQL3() ", + ssTableOutputMeta.getXML().contains( "Y" ) ); + + } + + @Test + public void testGetXMLUseCQL2() throws Exception { + SSTableOutputMeta ssTableOutputMeta = new SSTableOutputMeta(); + ssTableOutputMeta.setDefault(); + ssTableOutputMeta.setUseCQL3( false ); + assertTrue( "getXml() does not cover setUseCQL3() ", + ssTableOutputMeta.getXML().contains( "N" ) ); + } + + @Test + public void testGetXMLKeyField() throws Exception { + SSTableOutputMeta ssTableOutputMeta = new SSTableOutputMeta(); + ssTableOutputMeta.setDefault(); + ssTableOutputMeta.setKeyField( "some_key" ); + assertTrue( "getXml() does not cover setKeyField() ", + ssTableOutputMeta.getXML().contains( "some_key" ) ); + + } + + @Test + public void testGetXMLColumnFamilyName() throws Exception { + SSTableOutputMeta ssTableOutputMeta = new SSTableOutputMeta(); + ssTableOutputMeta.setDefault(); + ssTableOutputMeta.setColumnFamilyName( "someColumnFamilyName" ); + assertTrue( "getXml() does not cover setColumnFamilyName() ", + ssTableOutputMeta.getXML().contains( "someColumnFamilyName" ) ); + } + + @Test + public void testGetBufferSize() throws Exception { + SSTableOutputMeta ssTableOutputMeta = new SSTableOutputMeta(); + ssTableOutputMeta.setDefault(); + ssTableOutputMeta.setBufferSize( "some_buffer_size" ); + assertTrue( "getXml() does not cover setBufferSize() ", + ssTableOutputMeta.getXML().contains( "some_buffer_size" ) ); + + } + + @Test + public void testGetXMLCassandraKeyspace() throws Exception { + SSTableOutputMeta ssTableOutputMeta = new SSTableOutputMeta(); + ssTableOutputMeta.setDefault(); + ssTableOutputMeta.setCassandraKeyspace( "someCassandraKeyspace" ); + assertTrue( "getXml() does not cover setCassandraKeyspace() ", + ssTableOutputMeta.getXML().contains( "someCassandraKeyspace" ) ); + } + + @Test + public void testGetXMLYamlPath() throws Exception { + SSTableOutputMeta ssTableOutputMeta = new SSTableOutputMeta(); + ssTableOutputMeta.setDefault(); + ssTableOutputMeta.setYamlPath( "some_YamlPath" ); + assertTrue( "getXml() does not cover setYamlPath() ", + ssTableOutputMeta.getXML().contains( "some_YamlPath" ) ); + + } + + @Test + public void testGetXMLDirectory() throws Exception { + SSTableOutputMeta ssTableOutputMeta = new SSTableOutputMeta(); + ssTableOutputMeta.setDefault(); + ssTableOutputMeta.setDirectory( "someDirectory" ); + assertTrue( "getXml() does not cover setDirectory() ", + ssTableOutputMeta.getXML().contains( "someDirectory" ) ); + } + + @Test + public void testGetXMLDefault() throws Exception { + SSTableOutputMeta ssTableOutputMeta = new SSTableOutputMeta(); + ssTableOutputMeta.setDefault(); + String xml = ssTableOutputMeta.getXML(); + assertTrue( "getXml() does not cover defaults ", + xml.contains( "N" ) ); + assertTrue( "getXml() does not cover defaults ", + xml.contains( "16" ) ); + String defDirectory = + ( "" + System.getProperty( "java.io.tmpdir" ) + "" ).replace( ":", ":" ) + .replace( "\\", "\" ); + assertTrue( "getXml() does not cover defaults ", + xml.contains( defDirectory ) ); + } +} diff --git a/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputTest.java b/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputTest.java new file mode 100644 index 000000000..5d81b7d6a --- /dev/null +++ b/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutputTest.java @@ -0,0 +1,60 @@ +/*! + * Copyright 2015 Pentaho Corporation. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.pentaho.di.trans.steps.cassandrasstableoutput; + +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; +import org.pentaho.di.core.exception.KettleException; +import org.pentaho.di.core.logging.LoggingObjectInterface; +import org.pentaho.di.trans.steps.mock.StepMockHelper; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +public class SSTableOutputTest { + private static StepMockHelper helper; + private static final SecurityManager sm = System.getSecurityManager(); + + @BeforeClass + public static void setUp() throws KettleException { + //KettleEnvironment.init(); + helper = + new StepMockHelper( "SSTableOutputIT", SSTableOutputMeta.class, + SSTableOutputData.class ); + when( helper.logChannelInterfaceFactory.create( any(), any( LoggingObjectInterface.class ) ) ).thenReturn( + helper.logChannelInterface ); + when( helper.trans.isRunning() ).thenReturn( true ); + } + + @After + public void tearDown() throws Exception { + // Restore original security manager if needed + if ( System.getSecurityManager() != sm ) { + System.setSecurityManager( sm ); + } + } + + @Test( expected = SecurityException.class ) + public void testDisableSystemExit() throws Exception { + SSTableOutput ssTableOutput = + new SSTableOutput( helper.stepMeta, helper.stepDataInterface, 0, helper.transMeta, helper.trans ); + ssTableOutput.disableSystemExit( sm, helper.logChannelInterface ); + System.exit( 1 ); + } +} diff --git a/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/cassandra.yaml b/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/cassandra.yaml new file mode 100644 index 000000000..68df92229 --- /dev/null +++ b/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/cassandra.yaml @@ -0,0 +1,87 @@ +cluster_name: 'Test Cluster' +num_tokens: 256 +hinted_handoff_enabled: true +max_hint_window_in_ms: 10800000 +hinted_handoff_throttle_in_kb: 1024 +max_hints_delivery_threads: 2 +batchlog_replay_throttle_in_kb: 1024 +authenticator: AllowAllAuthenticator +authorizer: AllowAllAuthorizer +endpoint_snitch: SimpleSnitch +permissions_validity_in_ms: 2000 +partitioner: org.apache.cassandra.dht.Murmur3Partitioner +data_file_directories: + - /usr/local/var/lib/cassandra/data + + +commitlog_directory: /usr/local/var/lib/cassandra/commitlog +disk_failure_policy: stop +commit_failure_policy: stop +key_cache_size_in_mb: +key_cache_save_period: 14400 +row_cache_size_in_mb: 0 +row_cache_save_period: 0 +saved_caches_directory: /usr/local/var/lib/cassandra/saved_caches +commitlog_sync: periodic +commitlog_sync_period_in_ms: 10000 +commitlog_segment_size_in_mb: 32 +seed_provider: + + + + + - class_name: org.apache.cassandra.locator.SimpleSeedProvider + parameters: + + + - seeds: "127.0.0.1" +concurrent_reads: 32 +concurrent_writes: 32 +memtable_flush_queue_size: 4 +trickle_fsync: false +trickle_fsync_interval_in_kb: 10240 +storage_port: 7000 +ssl_storage_port: 7001 +listen_address: localhost +start_native_transport: true +native_transport_port: 9042 +start_rpc: true +rpc_address: localhost +rpc_port: 9160 +rpc_keepalive: true +rpc_server_type: sync +thrift_framed_transport_size_in_mb: 15 +incremental_backups: false +snapshot_before_compaction: false +auto_snapshot: true +tombstone_warn_threshold: 1000 +tombstone_failure_threshold: 100000 +column_index_size_in_kb: 64 +in_memory_compaction_limit_in_mb: 64 +multithreaded_compaction: false +compaction_throughput_mb_per_sec: 16 +compaction_preheat_key_cache: true +read_request_timeout_in_ms: 5000 +range_request_timeout_in_ms: 10000 +write_request_timeout_in_ms: 2000 +cas_contention_timeout_in_ms: 1000 +truncate_request_timeout_in_ms: 60000 +request_timeout_in_ms: 10000 +cross_node_timeout: false +dynamic_snitch_update_interval_in_ms: 100 +dynamic_snitch_reset_interval_in_ms: 600000 +dynamic_snitch_badness_threshold: 0.1 +request_scheduler: org.apache.cassandra.scheduler.NoScheduler +server_encryption_options: + internode_encryption: none + keystore: conf/.keystore + keystore_password: cassandra + truststore: conf/.truststore + truststore_password: cassandra +client_encryption_options: + enabled: false + keystore: conf/.keystore + keystore_password: cassandra +# internode_compression: all +inter_dc_tcp_nodelay: false +preheat_kernel_page_cache: false \ No newline at end of file diff --git a/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/AbstractSSTableWriterTest.java b/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/AbstractSSTableWriterTest.java new file mode 100644 index 000000000..3f998451a --- /dev/null +++ b/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/AbstractSSTableWriterTest.java @@ -0,0 +1,79 @@ +/*! + * Copyright 2015 Pentaho Corporation. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.pentaho.di.trans.steps.cassandrasstableoutput.writer; + +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class AbstractSSTableWriterTest extends AbstractSSTableWriter { + + @Test + public void testGetDirectory() throws Exception { + AbstractSSTableWriter writer = new AbstractSSTableWriterTest(); + assertEquals( System.getProperty( "java.io.tmpdir" ), writer.getDirectory() ); + writer.setDirectory( "some_dir" ); + assertEquals( "some_dir", writer.getDirectory() ); + } + + @Test + public void testGetKeyspace() throws Exception { + AbstractSSTableWriter writer = new AbstractSSTableWriterTest(); + assertEquals( null, writer.getKeyspace() ); + writer.setKeyspace( "some_keyspace" ); + assertEquals( "some_keyspace", writer.getKeyspace() ); + } + + @Test + public void testGetColumnFamily() throws Exception { + AbstractSSTableWriter writer = new AbstractSSTableWriterTest(); + assertEquals( null, writer.getColumnFamily() ); + writer.setColumnFamily( "some_col" ); + assertEquals( "some_col", writer.getColumnFamily() ); + } + + @Test + public void testGetBufferSize() throws Exception { + AbstractSSTableWriter writer = new AbstractSSTableWriterTest(); + assertEquals( 16, writer.getBufferSize() ); + writer.setBufferSize( 10 ); + assertEquals( 10, writer.getBufferSize() ); + } + + @Test + public void testGetKeyField() throws Exception { + AbstractSSTableWriter writer = new AbstractSSTableWriterTest(); + assertEquals( null, writer.getKeyField() ); + writer.setKeyField( "some_keyField" ); + assertEquals( "some_keyField", writer.getKeyField() ); + } + + @Override public void init() throws Exception { + + } + + @Override public void processRow( Map record ) throws Exception { + + } + + @Override public void close() throws Exception { + + } +} diff --git a/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/CQL2SSTableWriterTest.java b/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/CQL2SSTableWriterTest.java new file mode 100644 index 000000000..6a70cfd12 --- /dev/null +++ b/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/CQL2SSTableWriterTest.java @@ -0,0 +1,193 @@ +/*! + * Copyright 2015 Pentaho Corporation. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.pentaho.di.trans.steps.cassandrasstableoutput.writer; + +import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +public class CQL2SSTableWriterTest { + + public static final String KEY_FIELD = "KEY_FIELD"; + public static final String PARTITIONER_CLASSNAME = "java.lang.Integer"; + public static final String COLUMN_FAMILY = "COLUMN_FAMILY"; + public static final String KEY_SPACE = "KEY_SPACE"; + public static final String DIRECTORY_PATH = "directory_path"; + public static final int BUFFER_SIZE = 10; + public static final AtomicBoolean checker = new AtomicBoolean( true ); + public static final AtomicReference result = new AtomicReference( new HashMap>() ); + public static final AtomicReference workingKey = new AtomicReference( null ); + + class CQL2SSTableWriterStub extends CQL2SSTableWriter { + @Override SSTableSimpleUnsortedWriter getSsTableSimpleUnsortedWriter( File file, + Class partitionerClass, + String keyspace, + String columnFamily, + int bufferSize ) + throws InstantiationException, IllegalAccessException { + assertEquals( DIRECTORY_PATH, file.getName() ); + assertEquals( KEY_SPACE, keyspace ); + assertEquals( COLUMN_FAMILY, columnFamily ); + assertEquals( BUFFER_SIZE, bufferSize ); + assertEquals( PARTITIONER_CLASSNAME, partitionerClass.getCanonicalName() ); + SSTableSimpleUnsortedWriter ssWriter = mock( SSTableSimpleUnsortedWriter.class ); + try { + doAnswer( new Answer() { + @Override public Object answer( InvocationOnMock invocation ) throws Throwable { + checker.set( false ); + return null; + } + } ).when( ssWriter ).close(); + } catch ( IOException e ) { + fail( e.toString() ); + } + try { + doAnswer( new Answer() { + @Override public Object answer( InvocationOnMock invocation ) throws Throwable { + ByteBuffer arg = (ByteBuffer) invocation.getArguments()[ 0 ]; + Map> obj; + Map> objClone; + do { + obj = (Map>) result.get(); + objClone = new HashMap>( obj ); + List cols = objClone.get( arg ); + if ( cols == null ) { + objClone.put( arg, new ArrayList() ); + } else { + throw new IOException( "Such key is already used" ); + } + } while ( !result.compareAndSet( obj, objClone ) ); + workingKey.set( arg ); + return null; + } + } ).when( ssWriter ).newRow( (ByteBuffer) anyObject() ); + } catch ( IOException e ) { + fail( e.toString() ); + } + + + doAnswer( new Answer() { + @Override public Object answer( InvocationOnMock invocation ) throws Throwable { + ByteBuffer name = (ByteBuffer) invocation.getArguments()[ 0 ]; + ByteBuffer value = (ByteBuffer) invocation.getArguments()[ 1 ]; + Object timestamp = invocation.getArguments()[ 2 ]; + ByteBuffer currentKey = (ByteBuffer) workingKey.get(); + assertNotNull( "adding columns without calling new row", currentKey ); + assertFalse( "key is the same as column", currentKey.equals( value ) ); + Map> obj; + Map> objClone; + do { + obj = (Map>) result.get(); + objClone = new HashMap>( obj ); + List cols = objClone.get( currentKey ); + assertNotNull( "adding columns without calling new row", cols ); + cols.add( value ); + objClone.put( currentKey, cols ); + } while ( !result.compareAndSet( obj, objClone ) ); + + return null; + } + } ).when( ssWriter ).addColumn( (ByteBuffer) anyObject(), (ByteBuffer) anyObject(), anyLong() ); + + return ssWriter; + } + } + + @Test + public void testInit() throws Exception { + CQL2SSTableWriter writer = getCql2SSTableWriter(); + writer.init(); + } + + private CQL2SSTableWriter getCql2SSTableWriter() { + CQL2SSTableWriter writer = new CQL2SSTableWriterStub(); + writer.setKeyField( KEY_FIELD ); + writer.setPartitionerClassName( PARTITIONER_CLASSNAME ); + writer.setBufferSize( BUFFER_SIZE ); + writer.setColumnFamily( COLUMN_FAMILY ); + writer.setKeyspace( KEY_SPACE ); + writer.setDirectory( DIRECTORY_PATH ); + return writer; + } + + @Test + public void testProcessRow() throws Exception { + CQL2SSTableWriter writer = getCql2SSTableWriter(); + writer.init(); + Map input = new HashMap(); + input.put( KEY_FIELD, 1 ); + input.put( "someColumn", "someColumnValue" ); + writer.processRow( input ); + Map> expected = new HashMap>(); + List col = new ArrayList(); + col.add( CQL2SSTableWriter.valueToBytes( "someColumnValue" ) ); + expected.put( CQL2SSTableWriter.valueToBytes( 1 ), col ); + assertEquals( expected, result.get() ); + } + + @Test + public void testClose() throws Exception { + CQL2SSTableWriter writer = getCql2SSTableWriter(); + writer.init(); + checker.set( true ); + writer.close(); + assertFalse( checker.get() ); + } + + @Test + public void testBytes() throws Exception { + assertEquals( ByteBuffer.wrap( new byte[] { } ), CQL2SSTableWriter.valueToBytes( null ) ); + assertEquals( ByteBuffer.wrap( new byte[] { } ), CQL2SSTableWriter.valueToBytes( "" ) ); + assertEquals( ByteBuffer.wrap( new byte[] { 115, 116, 114, 105, 110, 103 } ), + CQL2SSTableWriter.valueToBytes( "string" ) ); + assertEquals( ByteBuffer.wrap( new byte[] { 0, 0, 0, 0 } ), CQL2SSTableWriter.valueToBytes( 0 ) ); + assertEquals( ByteBuffer.wrap( new byte[] { 0, 0, 1, 15 } ), CQL2SSTableWriter.valueToBytes( 271 ) ); + assertEquals( ByteBuffer.wrap( new byte[] { 0, 0, 0, 0 } ), CQL2SSTableWriter.valueToBytes( 0F ) ); + assertEquals( ByteBuffer.wrap( new byte[] { 65, 96, 0, 0 } ), CQL2SSTableWriter.valueToBytes( 14F ) ); + assertEquals( ByteBuffer.wrap( new byte[] { 116, 114, 117, 101 } ), CQL2SSTableWriter.valueToBytes( true ) ); + assertEquals( ByteBuffer.wrap( new byte[] { 102, 97, 108, 115, 101 } ), CQL2SSTableWriter.valueToBytes( false ) ); + assertEquals( ByteBuffer.wrap( new byte[] { 0, 0, 0, 0, 0, 0, 0, 0 } ), CQL2SSTableWriter.valueToBytes( 0L ) ); + assertEquals( ByteBuffer.wrap( new byte[] { 0, 0, 0, 0, 0, 0, 0, 16 } ), CQL2SSTableWriter.valueToBytes( 16L ) ); + assertEquals( ByteBuffer.wrap( new byte[] { 0, 0, 0, 0, 0, 0, 0, 0 } ), CQL2SSTableWriter.valueToBytes( 0.0 ) ); + assertEquals( ByteBuffer.wrap( new byte[] { 64, 54, -128, 0, 0, 0, 0, 0 } ), + CQL2SSTableWriter.valueToBytes( 22.5 ) ); + assertEquals( ByteBuffer.wrap( new byte[] { 0, 0, 0, 0, 7, 91, -51, 21 } ), + CQL2SSTableWriter.valueToBytes( new Date( 123456789 ) ) ); + assertEquals( ByteBuffer.wrap( new byte[] { } ), CQL2SSTableWriter.valueToBytes( new byte[] { } ) ); + assertEquals( ByteBuffer.wrap( new byte[] { 0, 0, 0, 0 } ), + CQL2SSTableWriter.valueToBytes( new byte[] { 0, 0, 0, 0 } ) ); + } +} diff --git a/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/CQL3SSTableWriterTest.java b/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/CQL3SSTableWriterTest.java new file mode 100644 index 000000000..7dd42245f --- /dev/null +++ b/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/CQL3SSTableWriterTest.java @@ -0,0 +1,144 @@ +/*! + * Copyright 2015 Pentaho Corporation. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.pentaho.di.trans.steps.cassandrasstableoutput.writer; + +import org.apache.cassandra.io.sstable.CQLSSTableWriter; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.pentaho.di.core.row.RowMetaInterface; +import org.pentaho.di.core.row.ValueMetaInterface; +import org.pentaho.di.core.row.value.ValueMetaBase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.*; + +public class CQL3SSTableWriterTest { + + public static final String KEY_FIELD = "KEY_FIELD"; + public static final String COLUMN_FAMILY = "COLUMN_FAMILY"; + public static final String KEY_SPACE = "KEY_SPACE"; + public static final String DIRECTORY_PATH = "directory_path"; + public static final int BUFFER_SIZE = 10; + public static final AtomicBoolean checker = new AtomicBoolean( true ); + public static final String COLUMN = "someColumn"; + + class CQL3SSTableWriterStub extends CQL3SSTableWriter { + @Override CQLSSTableWriter getCQLSSTableWriter() { + assertEquals( DIRECTORY_PATH, getDirectory() ); + assertEquals( KEY_SPACE, getKeyspace() ); + assertEquals( COLUMN_FAMILY, getColumnFamily() ); + assertEquals( BUFFER_SIZE, getBufferSize() ); + assertEquals( KEY_FIELD, getKeyField() ); + CQLSSTableWriter ssWriter = mock( CQLSSTableWriter.class ); + try { + doAnswer( new Answer() { + @Override public Object answer( InvocationOnMock invocation ) throws Throwable { + checker.set( false ); + return null; + } + } ).when( ssWriter ).close(); + } catch ( IOException e ) { + fail( e.toString() ); + } + + try { + doAnswer( new Answer() { + @Override public Object answer( InvocationOnMock invocation ) throws Throwable { + checker.set( true ); + return null; + } + } ).when( ssWriter ).addRow( (Map) anyObject() ); + } catch ( Exception e ) { + fail( e.toString() ); + } + + return ssWriter; + } + } + + @Test + public void testInit() throws Exception { + CQL3SSTableWriter writer = getCql3SSTableWriter(); + writer.init(); + } + + private CQL3SSTableWriter getCql3SSTableWriter() { + CQL3SSTableWriter writer = new CQL3SSTableWriterStub(); + writer.setKeyField( KEY_FIELD ); + RowMetaInterface rmi = mock( RowMetaInterface.class ); + ValueMetaInterface one = new ValueMetaBase( KEY_FIELD, ValueMetaBase.TYPE_INTEGER ); + ValueMetaInterface two = new ValueMetaBase( COLUMN, ValueMetaBase.TYPE_STRING ); + List valueMetaList = new ArrayList(); + valueMetaList.add( one ); + valueMetaList.add( two ); + String[] fieldNames = new String[] { "key", "two" }; + doReturn( valueMetaList ).when( rmi ).getValueMetaList(); + doReturn( fieldNames ).when( rmi ).getFieldNames(); + writer.setRowMeta( rmi ); + writer.setBufferSize( BUFFER_SIZE ); + writer.setColumnFamily( COLUMN_FAMILY ); + writer.setKeyspace( KEY_SPACE ); + writer.setDirectory( DIRECTORY_PATH ); + return writer; + } + + @Test + public void testProcessRow() throws Exception { + CQL3SSTableWriter writer = getCql3SSTableWriter(); + writer.init(); + Map input = new HashMap(); + input.put( KEY_FIELD, 1 ); + input.put( COLUMN, "someColumnValue" ); + checker.set( false ); + writer.processRow( input ); + assertTrue( checker.get() ); + } + + @Test + public void testClose() throws Exception { + CQL3SSTableWriter writer = getCql3SSTableWriter(); + writer.init(); + checker.set( true ); + writer.close(); + assertFalse( checker.get() ); + } + + @Test + public void testBuildCreateColumnFamilyCQLStatement() throws Exception { + CQL3SSTableWriter writer = getCql3SSTableWriter(); + writer.init(); + assertEquals( "CREATE TABLE KEY_SPACE.COLUMN_FAMILY (\"KEY_FIELD\" bigint,\"someColumn\" varchar,PRIMARY KEY " + + "(\"KEY_FIELD\"));", writer.buildCreateColumnFamilyCQLStatement() ); + } + + @Test + public void testBuildInsertCQLStatement() throws Exception { + CQL3SSTableWriter writer = getCql3SSTableWriter(); + writer.init(); + assertEquals( "INSERT INTO KEY_SPACE.COLUMN_FAMILY (\"key\",\"two\") VALUES (?,?);", writer.buildInsertCQLStatement() ); + } +} diff --git a/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/SSTableWriterBuilderTest.java b/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/SSTableWriterBuilderTest.java new file mode 100644 index 000000000..e400dc699 --- /dev/null +++ b/test-src/org/pentaho/di/trans/steps/cassandrasstableoutput/writer/SSTableWriterBuilderTest.java @@ -0,0 +1,128 @@ +/*! + * Copyright 2015 Pentaho Corporation. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.pentaho.di.trans.steps.cassandrasstableoutput.writer; + +import org.apache.cassandra.exceptions.ConfigurationException; +import org.junit.Test; +import org.pentaho.di.core.row.RowMetaInterface; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +public class SSTableWriterBuilderTest extends SSTableWriterBuilder { + + public static final String KEY_FIELD = "some_key"; + public static final int BUFFER_SIZE = 10; + public static final String COLUMN_FAMILY = "some_column_family"; + public static final String DIR = "some_dir"; + public static final String CONF_PATH = "some_conf_path"; + public static final String KEYSPACE = "some_keyspace"; + public static final String PARTIONER_CLASS_NAME = "PartionerClassName"; + public static final RowMetaInterface ROW_META = mock( RowMetaInterface.class ); + + class CQL2SSTableWriterStub extends CQL2SSTableWriter { + public CQL2SSTableWriterStub() { + assertEquals( "file:" + CONF_PATH, System.getProperty( "cassandra.config" ) ); + } + + @Override public void setKeyField( String keyField ) { + assertEquals( KEY_FIELD, keyField ); + } + + @Override public void setPartitionerClassName( String partitionerClassName ) { + assertEquals( PARTIONER_CLASS_NAME, partitionerClassName ); + } + + @Override public void setDirectory( String directory ) { + assertEquals( DIR, directory ); + } + + @Override public void setKeyspace( String keyspace ) { + assertEquals( KEYSPACE, keyspace ); + } + + @Override public void setColumnFamily( String columnFamily ) { + assertEquals( COLUMN_FAMILY, columnFamily ); + } + + @Override public void setBufferSize( int bufferSize ) { + assertEquals( BUFFER_SIZE, bufferSize ); + } + } + + class CQL3SSTableWriterStub extends CQL3SSTableWriter { + public CQL3SSTableWriterStub() { + assertEquals( "file:" + CONF_PATH, System.getProperty( "cassandra.config" ) ); + } + + @Override public void setKeyField( String keyField ) { + assertEquals( KEY_FIELD, keyField ); + } + + @Override public void setDirectory( String directory ) { + assertEquals( DIR, directory ); + } + + @Override public void setKeyspace( String keyspace ) { + assertEquals( KEYSPACE, keyspace ); + } + + @Override public void setColumnFamily( String columnFamily ) { + assertEquals( COLUMN_FAMILY, columnFamily ); + } + + @Override public void setBufferSize( int bufferSize ) { + assertEquals( BUFFER_SIZE, bufferSize ); + } + + @Override public void setRowMeta( RowMetaInterface rowMeta ) { + assertEquals( ROW_META, rowMeta ); + } + } + + @Override String getPartitionerClass() throws ConfigurationException { + return PARTIONER_CLASS_NAME; + } + + @Override CQL2SSTableWriter getCql2SSTableWriter() { + return new CQL2SSTableWriterStub(); + } + + @Override CQL3SSTableWriter getCql3SSTableWriter() { + return new CQL3SSTableWriterStub(); + } + + @Test + public void testBuild2() throws Exception { + SSTableWriterBuilder ssTableWriterBuilder = new SSTableWriterBuilderTest(); + ssTableWriterBuilder = ssTableWriterBuilder.withConfig( CONF_PATH ).withBufferSize( BUFFER_SIZE ) + .withColumnFamily( COLUMN_FAMILY ).withCqlVersion( 2 ).withDirectory( DIR ) + .withKeyField( KEY_FIELD ).withKeyspace( KEYSPACE ).withRowMeta( ROW_META ); + ssTableWriterBuilder.build(); + } + + @Test + public void testBuild3() throws Exception { + SSTableWriterBuilder ssTableWriterBuilder = new SSTableWriterBuilderTest(); + + ssTableWriterBuilder = ssTableWriterBuilder.withConfig( CONF_PATH ).withBufferSize( BUFFER_SIZE ) + .withColumnFamily( COLUMN_FAMILY ).withCqlVersion( 3 ).withDirectory( DIR ) + .withKeyField( KEY_FIELD ).withKeyspace( KEYSPACE ).withRowMeta( ROW_META ); + ssTableWriterBuilder.build(); + } +} diff --git a/test-src/org/pentaho/di/trans/steps/mock/StepMockHelper.java b/test-src/org/pentaho/di/trans/steps/mock/StepMockHelper.java new file mode 100644 index 000000000..c3f787a8f --- /dev/null +++ b/test-src/org/pentaho/di/trans/steps/mock/StepMockHelper.java @@ -0,0 +1,156 @@ +/*! + * Copyright 2015 Pentaho Corporation. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.pentaho.di.trans.steps.mock; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.pentaho.di.core.RowSet; +import org.pentaho.di.core.logging.KettleLogStore; +import org.pentaho.di.core.logging.LogChannel; +import org.pentaho.di.core.logging.LogChannelInterface; +import org.pentaho.di.core.logging.LogChannelInterfaceFactory; +import org.pentaho.di.core.logging.LogLevel; +import org.pentaho.di.core.logging.LogMessageInterface; +import org.pentaho.di.core.logging.LoggingObjectInterface; +import org.pentaho.di.trans.Trans; +import org.pentaho.di.trans.TransMeta; +import org.pentaho.di.trans.step.StepDataInterface; +import org.pentaho.di.trans.step.StepMeta; +import org.pentaho.di.trans.step.StepMetaInterface; + + +/** + * Copied from kettle-engine tests. Should be deleted after introducing pentaho-common-tests project + */ +public class StepMockHelper { + public final StepMeta stepMeta; + public final Data stepDataInterface; + public final TransMeta transMeta; + public final Trans trans; + public final Meta initStepMetaInterface; + public final Data initStepDataInterface; + public final Meta processRowsStepMetaInterface; + public final Data processRowsStepDataInterface; + public final LogChannelInterface logChannelInterface; + public final LogChannelInterfaceFactory logChannelInterfaceFactory; + public final LogChannelInterfaceFactory originalLogChannelInterfaceFactory; + + public StepMockHelper( String stepName, Class stepMetaClass, Class stepDataClass ) { + originalLogChannelInterfaceFactory = KettleLogStore.getLogChannelInterfaceFactory(); + logChannelInterfaceFactory = mock( LogChannelInterfaceFactory.class ); + logChannelInterface = mock( LogChannelInterface.class ); + KettleLogStore.setLogChannelInterfaceFactory( logChannelInterfaceFactory ); + stepMeta = mock( StepMeta.class ); + when( stepMeta.getName() ).thenReturn( stepName ); + stepDataInterface = mock( stepDataClass ); + transMeta = mock( TransMeta.class ); + when( transMeta.findStep( stepName ) ).thenReturn( stepMeta ); + trans = mock( Trans.class ); + initStepMetaInterface = mock( stepMetaClass ); + initStepDataInterface = mock( stepDataClass ); + processRowsStepDataInterface = mock( stepDataClass ); + processRowsStepMetaInterface = mock( stepMetaClass ); + } + + public RowSet getMockInputRowSet( Object[]... rows ) { + return getMockInputRowSet( asList( rows ) ); + } + + public RowSet getMockInputRowSet( final List rows ) { + final AtomicInteger index = new AtomicInteger( 0 ); + RowSet rowSet = mock( RowSet.class, Mockito.RETURNS_MOCKS ); + Answer answer = new Answer() { + @Override + public Object[] answer( InvocationOnMock invocation ) throws Throwable { + int i = index.getAndIncrement(); + return i < rows.size() ? rows.get( i ) : null; + } + }; + when( rowSet.getRowWait( anyLong(), any( TimeUnit.class ) ) ).thenAnswer( answer ); + when( rowSet.getRow() ).thenAnswer( answer ); + when( rowSet.isDone() ).thenAnswer( new Answer() { + + @Override + public Boolean answer( InvocationOnMock invocation ) throws Throwable { + return index.get() >= rows.size(); + } + } ); + return rowSet; + } + + public static List asList( Object[]... objects ) { + List result = new ArrayList(); + Collections.addAll( result, objects ); + return result; + } + + public void cleanUp() { + KettleLogStore.setLogChannelInterfaceFactory( originalLogChannelInterfaceFactory ); + } + + /** + * In case you need to use log methods during the tests + * use redirectLog method after creating new StepMockHelper object. + * Examples: + * stepMockHelper.redirectLog( System.out, LogLevel.ROWLEVEL ); + * stepMockHelper.redirectLog( new FileOutputStream("log.txt"), LogLevel.BASIC ); + */ + public void redirectLog( final OutputStream out, LogLevel channelLogLevel ) { + final LogChannel log = spy( new LogChannel( this.getClass().getName(), true ) ); + log.setLogLevel( channelLogLevel ); + when( logChannelInterfaceFactory.create( any(), any( LoggingObjectInterface.class ) ) ).thenReturn( log ); + doAnswer( new Answer() { + @Override + public Object answer( InvocationOnMock invocation ) throws Throwable { + Object[] args = invocation.getArguments(); + + LogLevel logLevel = (LogLevel) args[1]; + LogLevel channelLogLevel = log.getLogLevel(); + + if ( !logLevel.isVisible( channelLogLevel ) ) { + return null; // not for our eyes. + } + if ( channelLogLevel.getLevel() >= logLevel.getLevel() ) { + LogMessageInterface logMessage = (LogMessageInterface) args[0]; + out.write( logMessage.getMessage().getBytes() ); + out.write( '\n' ); + out.write( '\r' ); + out.flush(); + return true; + } + return false; + } + } ).when( log ).println( (LogMessageInterface) anyObject(), (LogLevel) anyObject() ); + } +}