0

I am working on blockchain transaction anomaly detection system and testing various models. Currently I am stuck on a LSTM autoencoder. I have preprocessed transaction data from ethereum network (used Robust scaler, removed string features and left only numerical columns). This is fragment of my code:

def create_sequences(data, seq_length):
    sequences = []
    for i in range(len(data) - seq_length + 1):
        sequences.append(data[i:i + seq_length])
    return np.array(sequences)


def build_autoencoder(input_dim, seq_length):
    inputs = Input(shape=(seq_length, input_dim))

    encoded = LSTM(64, activation="relu", return_sequences=True, kernel_regularizer=regularizers.l1_l2(l1=0.001, l2=0.001))(inputs)
    encoded = Dropout(0.2)(encoded)
    encoded = LSTM(32, activation="relu", return_sequences=False, kernel_regularizer=regularizers.l1_l2(l1=0.001, l2=0.001))(encoded)
    encoded = Dense(16, activation="relu", kernel_regularizer=regularizers.l1_l2(l1=0.001, l2=0.001))(encoded)  
    encoded = Dropout(0.2)(encoded)
    repeated = RepeatVector(seq_length)(encoded)

    decoded = LSTM(64, activation="relu", return_sequences=True, kernel_regularizer=regularizers.l1_l2(l1=0.001, l2=0.001))(repeated)
    decoded = Dropout(0.2)(decoded)
    decoded = LSTM(input_dim, activation="sigmoid", return_sequences=True)(decoded)

    autoencoder = Model(inputs, decoded)
    autoencoder.compile(optimizer="adam", loss="mse")
    return autoencoder


input_dim = None
autoencoder = None

class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, conn, features_table_name, seq_length, batch_size, partition_size):
        # Some initialization

    def _load_data(self):
        # Some data loading (athena query)

    def _create_sequences(self, data):
        sequences = []
        for i in range(len(data) - self.seq_length + 1):
            sequences.append(data[i:i + self.seq_length])
        return np.array(sequences)

    def __len__(self):
        if self.data is None:
            return 0
        total_sequences = len(self.data) - self.seq_length + 1
        return max(1, int(np.ceil(total_sequences / self.batch_size)))

    def __getitem__(self, index):
        if self.data is None:
            raise StopIteration
        
        # Calculate start and end of the batch
        start_idx = index * self.batch_size
        end_idx = start_idx + self.batch_size
        sequences = self._create_sequences(self.data)
        batch_data = sequences[start_idx:end_idx]
        return batch_data, batch_data

    def on_epoch_end(self):
        self.data = self._load_data()
        if self.data is None:
            raise StopIteration

seq_length = 50
batch_size = 64
epochs = 10
partition_size = 50000

generator = DataGenerator(conn, features_table_name, seq_length, batch_size, partition_size)

input_dim = generator[0][0].shape[-1]
autoencoder = build_autoencoder(input_dim, seq_length)

steps_per_epoch = len(generator)
autoencoder.fit(generator, epochs=epochs, steps_per_epoch=steps_per_epoch, verbose=1)

train_mse_list = []

for i in range(len(generator)):
    batch_data, _ = generator[i]
    reconstructions = autoencoder.predict(batch_data)
    batch_mse = np.mean(np.mean(np.square(batch_data - reconstructions), axis=-1), axis=-1)
    train_mse_list.extend(batch_mse)

train_mse = np.array(train_mse_list)
threshold = np.percentile(train_mse, 99)

print(f"Threshold: {threshold}")

test_data = test_df.drop(columns=['label']).to_numpy(dtype=float)
test_sequences = create_sequences(test_data, seq_length)

test_reconstructions = autoencoder.predict(test_sequences)
test_mse = np.mean(np.mean(np.square(test_sequences - test_reconstructions), axis=-1), axis=-1)
anomalies = test_mse > threshold
test_labels = test_df["label"].values[seq_length-1:]  

tn, fp, fn, tp = confusion_matrix(test_labels, anomalies).ravel()

specificity = tn / (tn + fp)
recall = recall_score(test_labels, anomalies)
f1 = f1_score(test_labels, anomalies)
accuracy = accuracy_score(test_labels, anomalies)

print(f"Specificity: {specificity:.2f}, Sensitivity: {recall:.2f}, F1-Score: {f1:.2f}, Accuracy: {accuracy:.2f}")

cm = confusion_matrix(test_labels, anomalies)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["Negative", "Positive"])

plt.figure(figsize=(6, 6))
disp.plot(cmap="Blues", colorbar=True)
plt.title("Confusion Matrix")
plt.show()

And these are results I get: Specificity: 1.00, Sensitivity: 0.00, F1-Score: 0.00, Accuracy: 0.78 Confusion matrix

It looks like my trained model is always predicting 'False' or always 'True'. As you can see in the code above - I am using generator in order to work on huge amount of data, L1 and L2 reguralizers (feature selection). Do you see anything I can do to improve predicting of my model? Am I doing something wrong?

1 Answer 1

0

You should compare the distribution of reconstruction errors on train vs test. If these are different, then your threshold (estimated from p99 of the training distribution), will lead to very bad decisions.

If this is the case, you need to identify why your distributions are so different, and try to fix the underlying problem.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.