Tyk-pump (stdout)

So you would also have download the source file for pump and modify that. It’s not only within the configuration file but also in the logic for the parameters. I have hacked one that would work for decodingBase 64 and filtering the exact logs to show on json. The filtering does not work for text value. All you need to do is:

  1. Clone/download pump source
  2. Replace stdout.go file with the code below
  3. Compile and build a new custom pump
  4. Link the new custom pump with your analytics and you should be ready to go
package pumps

import (
	"context"
	"encoding/base64"
	"encoding/json"
	"strings"

	"github.com/TykTechnologies/tyk-pump/analytics"
	"github.com/mitchellh/mapstructure"
)

var (
	stdOutPrefix     = "stdout-pump"
	stdOutDefaultENV = PUMPS_ENV_PREFIX + "_STDOUT" + PUMPS_ENV_META_PREFIX
)

type StdOutPump struct {
	CommonPumpConfig
	conf *StdOutConf
}

// @PumpConf StdOut
type StdOutConf struct {
	EnvPrefix string `mapstructure:"meta_env_prefix"`
	// Format of the analytics logs. Default is `text` if `json` is not explicitly specified. When
	// JSON logging is used all pump logs to stdout will be JSON.
	Format string `json:"format" mapstructure:"format"`
	// Root name of the JSON object the analytics record is nested in.
	LogFieldName string `json:"log_field_name" mapstructure:"log_field_name"`
	// Define which Analytics fields should participate in the Splunk event. Check the available
	// fields in the example below. Default value is `["method",
	// "path", "response_code", "api_key", "time_stamp", "api_version", "api_name", "api_id",
	// "org_id", "oauth_id", "raw_request", "request_time", "raw_response", "ip_address"]`.
	Fields []string `json:"fields" mapstructure:"fields"`
	// Choose which tags to be ignored by the Splunk Pump. Keep in mind that the tag name and value
	// are hyphenated. Default value is `[]`.
	IgnoreTagPrefixList []string `json:"ignore_tag_prefix_list" mapstructure:"ignore_tag_prefix_list"`
	// Allows for the base64 bits to be decode before being passed to ES.
	DecodeBase64 bool `json:"decode_base64" mapstructure:"decode_base64"`
}

func (s *StdOutPump) GetName() string {
	return "Stdout Pump"
}

func (s *StdOutPump) GetEnvPrefix() string {
	return s.conf.EnvPrefix
}

func (s *StdOutPump) New() Pump {
	newPump := StdOutPump{}
	return &newPump
}

func (s *StdOutPump) Init(config interface{}) error {

	s.log = log.WithField("prefix", stdOutPrefix)

	s.conf = &StdOutConf{}
	err := mapstructure.Decode(config, &s.conf)

	if err != nil {
		s.log.Fatal("Failed to decode configuration: ", err)
	}

	processPumpEnvVars(s, s.log, s.conf, stdOutDefaultENV)

	if s.conf.LogFieldName == "" {
		s.conf.LogFieldName = "tyk-analytics-record"
	}

	s.log.Info(s.GetName() + " Initialized")

	return nil

}

// Filters the tags based on config rule
func FilterTags(filteredTags []string, ignoreTagPrefixList []string) []string {
	// Loop all explicitly ignored tags
	for _, excludeTag := range ignoreTagPrefixList{
		// Loop the current analytics item tags
		for key, currentTag := range filteredTags {
			// If the current tag's value includes an ignored word, remove it from the list
			if strings.HasPrefix(currentTag, excludeTag) {
				copy(filteredTags[key:], filteredTags[key+1:])
				filteredTags[len(filteredTags)-1] = ""
				filteredTags = filteredTags[:len(filteredTags)-1]
			}
		}
	}

	return filteredTags
}

func GetMapping(record analytics.AnalyticsRecord, fields []string, ignoreTagPrefix []string) (map[string]interface{}) {

	mapping := map[string]interface{}{
		"method":         record.Method,
		"host":           record.Host,
		"path":           record.Path,
		"raw_path":       record.RawPath,
		"content_length": record.ContentLength,
		"user_agent":     record.UserAgent,
		"response_code":  record.ResponseCode,
		"api_key":        record.APIKey,
		"time_stamp":     record.TimeStamp,
		"api_version":    record.APIVersion,
		"api_name":       record.APIName,
		"api_id":         record.APIID,
		"org_id":         record.OrgID,
		"oauth_id":       record.OauthID,
		"raw_request":    record.RawRequest,
		"request_time":   record.RequestTime,
		"raw_response":   record.RawResponse,
		"ip_address":     record.IPAddress,
		"geo":            record.Geo,
		"network":        record.Network,
		"latency":        record.Latency,
		"tags":           record.Tags,
		"alias":          record.Alias,
		"track_path":     record.TrackPath,
	}

	// Define an empty event
	newMap := make(map[string]interface{})

	// Populate the Splunk event with the fields set in the config
	if len(fields) > 0 {
		// Loop through all fields set in the pump config
		for _, field := range fields {
			// Skip the next actions in case the configured field doesn't exist
			if _, ok := mapping[field]; !ok {
				continue
			}

			// Check if the current analytics field is "tags" and see if some tags are explicitly excluded
			if field == "tags" && len(ignoreTagPrefix) > 0 {
				// Reassign the tags after successful filtration
				mapping["tags"] = FilterTags(mapping["tags"].([]string), ignoreTagPrefix)
			}

			// Adding field value
			newMap[field] = mapping[field]
		}
	} else {
		newMap = mapping
	}
	
	return newMap
}

/**
** Write the actual Data to Stdout Here
 */
func (s *StdOutPump) WriteData(ctx context.Context, data []interface{}) error {
	s.log.Debug("Attempting to write ", len(data), " records...")

	//Data is all the analytics being written
	for _, v := range data {

		select {
		case <-ctx.Done():
			return nil
		default:
			decoded := v.(analytics.AnalyticsRecord)
			
			if s.conf.DecodeBase64 {
				rawRequest, _ := base64.StdEncoding.DecodeString(decoded.RawRequest)
				decoded.RawRequest = string(rawRequest)
				rawResponse, _ := base64.StdEncoding.DecodeString(decoded.RawResponse)
				decoded.RawResponse = string(rawResponse)
			} 
	
			if s.conf.Format == "json" {
				mapping := GetMapping(decoded, s.conf.Fields, s.conf.IgnoreTagPrefixList)

				data, err := json.Marshal(mapping)
				if err != nil {
					return err
				}
				
				s.log.WithField(s.conf.LogFieldName, string(data)).Info()

			} else {
				s.log.WithField(s.conf.LogFieldName, decoded).Info()
			}

		}
	}
	s.log.Info("Purged ", len(data), " records...")

	return nil
}

It is worth nothing that the ignore_tag_prefix_list only handles the tags. What you really want is specifying the fields that would show up in the logs. You can reverse the logic by modifying the code and removing unwanted fields instead.

1 Like