Skip to content

Commit

Permalink
feat(cdp): Masking destinations (#24266)
Browse files Browse the repository at this point in the history
benjackwhite authored Aug 14, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 39467be commit 524ebff
Showing 25 changed files with 809 additions and 151 deletions.
10 changes: 10 additions & 0 deletions docker-compose.base.yml
Original file line number Diff line number Diff line change
@@ -64,6 +64,16 @@ services:
timeout: 10s
retries: 10

redis7:
image: redis:7.2-alpine
restart: on-failure
command: redis-server --maxmemory-policy allkeys-lru --maxmemory 200mb
healthcheck:
test: ['CMD', 'redis-cli', 'ping']
interval: 3s
timeout: 10s
retries: 10

clickhouse:
#
# Note: please keep the default version in sync across
6 changes: 6 additions & 0 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
@@ -37,6 +37,12 @@ services:
service: redis
ports:
- '6379:6379'
redis7:
extends:
file: docker-compose.base.yml
service: redis7
ports:
- '6479:6379'

flower:
extends:
121 changes: 117 additions & 4 deletions frontend/src/scenes/pipeline/hogfunctions/HogFunctionConfiguration.tsx
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import {
LemonDropdown,
LemonInput,
LemonLabel,
LemonSelect,
LemonSwitch,
LemonTextArea,
Link,
@@ -256,7 +257,15 @@ export function HogFunctionConfiguration({ templateId, id }: { templateId?: stri
</div>

<div className="border bg-bg-light rounded p-3 space-y-2">
<LemonField name="filters" label="Filters by events and actions" className="gap-2">
<LemonField
name="filters"
label="Filters"
help={
<>
This destination will be triggered if <b>any of</b> the above filters match.
</>
}
>
{({ value, onChange }) => (
<>
<TestAccountFilterSwitch
@@ -302,9 +311,113 @@ export function HogFunctionConfiguration({ templateId, id }: { templateId?: stri
)}
</LemonField>

<p className="italic text-muted-alt">
This destination will be triggered if <b>any of</b> the above filters match.
</p>
<LemonField name="masking" label="Trigger options">
{({ value, onChange }) => (
<div className="flex items-center gap-1 flex-wrap">
<LemonSelect
options={[
{
value: null,
label: 'Run every time',
},
{
value: 'all',
label: 'Run once per interval',
},
{
value: '{person.uuid}',
label: 'Run once per person per interval',
},
]}
value={value?.hash ?? null}
onChange={(val) =>
onChange({
hash: val,
ttl: value?.ttl ?? 60 * 30,
})
}
/>
{configuration.masking?.hash ? (
<>
<div className="flex items-center gap-1 flex-wrap">
<span>of</span>
<LemonSelect
value={value?.ttl}
onChange={(val) => onChange({ ...value, ttl: val })}
options={[
{
value: 5 * 60,
label: '5 minutes',
},
{
value: 15 * 60,
label: '15 minutes',
},
{
value: 30 * 60,
label: '30 minutes',
},
{
value: 60 * 60,
label: '1 hour',
},
{
value: 2 * 60 * 60,
label: '2 hours',
},
{
value: 4 * 60 * 60,
label: '4 hours',
},
{
value: 8 * 60 * 60,
label: '8 hours',
},
{
value: 12 * 60 * 60,
label: '12 hours',
},
{
value: 24 * 60 * 60,
label: '24 hours',
},
]}
/>
</div>
<div className="flex items-center gap-1 flex-wrap">
<span>or until</span>
<LemonSelect
value={value?.threshold}
onChange={(val) => onChange({ ...value, threshold: val })}
options={[
{
value: null,
label: 'Not set',
},
{
value: 1000,
label: '1000 events',
},
{
value: 10000,
label: '10,000 events',
},
{
value: 100000,
label: '100,000 events',
},
{
value: 1000000,
label: '1,000,000 events',
},
]}
/>
</div>
</>
) : null}
</div>
)}
</LemonField>
</div>
<div className="relative border bg-bg-light rounded p-3 space-y-2">
<LemonLabel>Expected volume</LemonLabel>
Original file line number Diff line number Diff line change
@@ -125,7 +125,8 @@ export function sanitizeConfiguration(data: HogFunctionConfigurationType): HogFu
...data,
filters: data.filters ? sanitizeFilters(data.filters) : null,
inputs: sanitizedInputs,
icon_url: data.icon_url?.replace('&temp=true', ''), // Remove temp=true so it doesn't try and suggest new options next time
masking: data.masking?.hash ? data.masking : null,
icon_url: data.icon_url,
}

return payload
8 changes: 8 additions & 0 deletions frontend/src/types.ts
Original file line number Diff line number Diff line change
@@ -4312,6 +4312,13 @@ export type HogFunctionInputType = {
bytecode?: any
}

export type HogFunctionFiltersMasking = {
ttl: number | null
threshold?: number | null
hash: string
bytecode?: any
}

export type HogFunctionType = {
id: string
icon_url?: string
@@ -4325,6 +4332,7 @@ export type HogFunctionType = {

inputs_schema?: HogFunctionInputSchemaType[]
inputs?: Record<string, HogFunctionInputType>
masking?: HogFunctionFiltersMasking | null
filters?: HogConfigFilters | null
template?: HogFunctionTemplateType
status?: HogFunctionStatus
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0455_alter_externaldatasource_source_type
posthog: 0456_hogfunction_masking
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
38 changes: 32 additions & 6 deletions plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
@@ -25,7 +25,9 @@ import { AsyncFunctionExecutor } from './async-function-executor'
import { GroupsManager } from './groups-manager'
import { HogExecutor } from './hog-executor'
import { HogFunctionManager } from './hog-function-manager'
import { HogMasker } from './hog-masker'
import { HogWatcher, HogWatcherState } from './hog-watcher'
import { CdpRedis, createCdpRedisPool } from './redis'
import {
CdpOverflowMessage,
HogFunctionAsyncFunctionResponse,
@@ -84,9 +86,11 @@ abstract class CdpConsumerBase {
asyncFunctionExecutor: AsyncFunctionExecutor
hogExecutor: HogExecutor
hogWatcher: HogWatcher
hogMasker: HogMasker
groupsManager: GroupsManager
isStopping = false
messagesToProduce: HogFunctionMessageToProduce[] = []
redis: CdpRedis

protected kafkaProducer?: KafkaProducerWrapper
protected abstract name: string
@@ -96,10 +100,12 @@ abstract class CdpConsumerBase {
protected heartbeat = () => {}

constructor(protected hub: Hub) {
this.redis = createCdpRedisPool(hub)
this.hogFunctionManager = new HogFunctionManager(hub.postgres, hub)
this.hogWatcher = new HogWatcher(hub, (id, state) => {
this.hogWatcher = new HogWatcher(hub, this.redis, (id, state) => {
void this.captureInternalPostHogEvent(id, 'hog function state changed', { state })
})
this.hogMasker = new HogMasker(this.redis)
this.hogExecutor = new HogExecutor(this.hogFunctionManager)
const rustyHook = this.hub?.rustyHook ?? new RustyHook(this.hub)
this.asyncFunctionExecutor = new AsyncFunctionExecutor(this.hub, rustyHook)
@@ -350,9 +356,7 @@ abstract class CdpConsumerBase {

const states = await this.hogWatcher.getStates(possibleInvocations.map((x) => x.hogFunction.id))

const overflowGlobalsAndFunctions: Record<string, HogFunctionOverflowedGlobals> = {}

const invocations = possibleInvocations.filter((item) => {
const notDisabledInvocations = possibleInvocations.filter((item) => {
const state = states[item.hogFunction.id].state
if (state >= HogWatcherState.disabledForPeriod) {
this.produceAppMetric({
@@ -368,6 +372,29 @@ abstract class CdpConsumerBase {
return false
}

return true
})

// Now we can filter by masking configs
const { masked, notMasked: notMaskedInvocations } = await this.hogMasker.filterByMasking(
notDisabledInvocations
)

masked.forEach((item) => {
this.produceAppMetric({
team_id: item.globals.project.id,
app_source_id: item.hogFunction.id,
metric_kind: 'other',
metric_name: 'masked',
count: 1,
})
})

const overflowGlobalsAndFunctions: Record<string, HogFunctionOverflowedGlobals> = {}

const notOverflowedInvocations = notMaskedInvocations.filter((item) => {
const state = states[item.hogFunction.id].state

if (state === HogWatcherState.degraded) {
const key = `${item.globals.project.id}-${item.globals.event.uuid}`
overflowGlobalsAndFunctions[key] = overflowGlobalsAndFunctions[key] || {
@@ -377,7 +404,6 @@ abstract class CdpConsumerBase {

overflowGlobalsAndFunctions[key].hogFunctionIds.push(item.hogFunction.id)
counterFunctionInvocation.inc({ outcome: 'overflowed' }, 1)

return false
}

@@ -396,7 +422,7 @@ abstract class CdpConsumerBase {
})

const results = (
await this.runManyWithHeartbeat(invocations, (item) =>
await this.runManyWithHeartbeat(notOverflowedInvocations, (item) =>
this.hogExecutor.executeFunction(item.globals, item.hogFunction)
)
).filter((x) => !!x) as HogFunctionInvocationResult[]
2 changes: 1 addition & 1 deletion plugin-server/src/cdp/hog-executor.ts
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ import { convertToHogFunctionFilterGlobal } from './utils'
const MAX_ASYNC_STEPS = 2
const MAX_HOG_LOGS = 10
const MAX_LOG_LENGTH = 10000
const DEFAULT_TIMEOUT_MS = 100
export const DEFAULT_TIMEOUT_MS = 100

const hogExecutionDuration = new Histogram({
name: 'cdp_hog_function_execution_duration_ms',
12 changes: 11 additions & 1 deletion plugin-server/src/cdp/hog-function-manager.ts
Original file line number Diff line number Diff line change
@@ -11,7 +11,17 @@ type HogFunctionCache = {
teams: Record<Team['id'], HogFunctionType['id'][] | undefined>
}

const HOG_FUNCTION_FIELDS = ['id', 'team_id', 'name', 'enabled', 'inputs', 'inputs_schema', 'filters', 'bytecode']
const HOG_FUNCTION_FIELDS = [
'id',
'team_id',
'name',
'enabled',
'inputs',
'inputs_schema',
'filters',
'bytecode',
'masking',
]

export class HogFunctionManager {
private started: boolean
130 changes: 130 additions & 0 deletions plugin-server/src/cdp/hog-masker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { exec } from '@posthog/hogvm'
import { createHash } from 'crypto'

import { CdpRedis } from './redis'
import { HogFunctionInvocationGlobals, HogFunctionType } from './types'

export const BASE_REDIS_KEY = process.env.NODE_ENV == 'test' ? '@posthog-test/hog-masker' : '@posthog/hog-masker'
const REDIS_KEY_TOKENS = `${BASE_REDIS_KEY}/mask`

// NOTE: These are controlled via the api so are more of a sanity fallback
const MASKER_MAX_TTL = 60 * 60 * 24
const MASKER_MIN_TTL = 1

type MaskContext = {
hogFunctionId: string
hash: string
increment: number
ttl: number
allowedExecutions: number
threshold: number | null
}

type HogInvocationContext = {
globals: HogFunctionInvocationGlobals
hogFunction: HogFunctionType
}

type HogInvocationContextWithMasker = HogInvocationContext & {
masker?: MaskContext
}

/**
* HogMasker
*
* Responsible for determining if a function is "masked" or not based on the function configuration
*/

// Hog masker is meant to be done per batch
export class HogMasker {
constructor(private redis: CdpRedis) {}

public async filterByMasking(invocations: HogInvocationContext[]): Promise<{
masked: HogInvocationContext[]
notMasked: HogInvocationContext[]
}> {
const invocationsWithMasker: HogInvocationContextWithMasker[] = [...invocations]
const masks: Record<string, MaskContext> = {}

// We find all functions that have a mask and we load their masking from redis
invocationsWithMasker.forEach((item) => {
if (item.hogFunction.masking) {
// TODO: Catch errors
const value = exec(item.hogFunction.masking.bytecode, {
globals: item.globals,
timeout: 50,
maxAsyncSteps: 0,
})
// What to do if it is null....
const hash = createHash('md5').update(String(value.result)).digest('hex').substring(0, 32)
const hashKey = `${item.hogFunction.id}:${hash}`
masks[hashKey] = masks[hashKey] || {
hash,
hogFunctionId: item.hogFunction.id,
increment: 0,
ttl: Math.max(
MASKER_MIN_TTL,
Math.min(MASKER_MAX_TTL, item.hogFunction.masking.ttl ?? MASKER_MAX_TTL)
),
threshold: item.hogFunction.masking.threshold,
allowedExecutions: 0,
}

masks[hashKey]!.increment++
item.masker = masks[hashKey]
}
})

if (Object.keys(masks).length === 0) {
return { masked: [], notMasked: invocations }
}

const result = await this.redis.usePipeline({ name: 'masker', failOpen: true }, (pipeline) => {
Object.values(masks).forEach(({ hogFunctionId, hash, increment, ttl }) => {
pipeline.incrby(`${REDIS_KEY_TOKENS}/${hogFunctionId}/${hash}`, increment)
// @ts-expect-error - NX is not typed in ioredis
pipeline.expire(`${REDIS_KEY_TOKENS}/${hogFunctionId}/${hash}`, ttl, 'NX')
})
})

Object.values(masks).forEach((masker, index) => {
const newValue: number | null = result ? result[index * 2][1] : null
if (newValue === null) {
// We fail closed here as with a masking config the typical case will be not to send
return
}

const oldValue = newValue - masker.increment

// Simplest case - the previous value was 0
masker.allowedExecutions = oldValue === 0 ? 1 : 0

if (masker.threshold) {
// TRICKY: We minus 1 to account for the "first" execution
const thresholdsPasses =
Math.floor((newValue - 1) / masker.threshold) - Math.floor((oldValue - 1) / masker.threshold)

if (thresholdsPasses) {
masker.allowedExecutions = thresholdsPasses
}
}
})

return invocationsWithMasker.reduce(
(acc, item) => {
if (item.masker) {
if (item.masker.allowedExecutions > 0) {
item.masker.allowedExecutions--
acc.notMasked.push(item)
} else {
acc.masked.push(item)
}
} else {
acc.notMasked.push(item)
}
return acc
},
{ masked: [], notMasked: [] } as { masked: HogInvocationContext[]; notMasked: HogInvocationContext[] }
)
}
}
136 changes: 12 additions & 124 deletions plugin-server/src/cdp/hog-watcher.ts
Original file line number Diff line number Diff line change
@@ -1,70 +1,13 @@
import { captureException } from '@sentry/node'
import { Pipeline, Redis } from 'ioredis'

import { Hub } from '../types'
import { timeoutGuard } from '../utils/db/utils'
import { now } from '../utils/now'
import { status } from '../utils/status'
import { UUIDT } from '../utils/utils'
import { CdpRedis } from './redis'
import { HogFunctionInvocationResult, HogFunctionType } from './types'

export const BASE_REDIS_KEY = process.env.NODE_ENV == 'test' ? '@posthog-test/hog-watcher' : '@posthog/hog-watcher'
const REDIS_KEY_TOKENS = `${BASE_REDIS_KEY}/tokens`
const REDIS_KEY_DISABLED = `${BASE_REDIS_KEY}/disabled`
const REDIS_KEY_DISABLED_HISTORY = `${BASE_REDIS_KEY}/disabled_history`
const REDIS_TIMEOUT_SECONDS = 5

// NOTE: We ideally would have this in a file but the current build step doesn't handle anything other than .ts files
const LUA_TOKEN_BUCKET = `
local key = KEYS[1]
local now = ARGV[1]
local cost = ARGV[2]
local poolMax = ARGV[3]
local fillRate = ARGV[4]
local expiry = ARGV[5]
local before = redis.call('hget', key, 'ts')
-- If we don't have a timestamp then we set it to now and fill up the bucket
if before == false then
local ret = poolMax - cost
redis.call('hset', key, 'ts', now)
redis.call('hset', key, 'pool', ret)
redis.call('expire', key, expiry)
return ret
end
-- We update the timestamp if it has changed
local timeDiffSeconds = now - before
if timeDiffSeconds > 0 then
redis.call('hset', key, 'ts', now)
else
timeDiffSeconds = 0
end
-- Calculate how much should be refilled in the bucket and add it
local owedTokens = timeDiffSeconds * fillRate
local currentTokens = redis.call('hget', key, 'pool')
if currentTokens == false then
currentTokens = poolMax
end
currentTokens = math.min(currentTokens + owedTokens, poolMax)
-- Remove the cost and return the new number of tokens
if currentTokens - cost >= 0 then
currentTokens = currentTokens - cost
else
currentTokens = -1
end
redis.call('hset', key, 'pool', currentTokens)
redis.call('expire', key, expiry)
-- Finally return the value - if it's negative then we've hit the limit
return currentTokens
`

export enum HogWatcherState {
healthy = 1,
@@ -79,19 +22,12 @@ export type HogWatcherFunctionState = {
rating: number
}

type WithCheckRateLimit<T> = {
checkRateLimit: (key: string, now: number, cost: number, poolMax: number, fillRate: number, expiry: number) => T
}

type HogWatcherRedisClientPipeline = Pipeline & WithCheckRateLimit<number>

type HogWatcherRedisClient = Omit<Redis, 'pipeline'> &
WithCheckRateLimit<Promise<number>> & {
pipeline: () => HogWatcherRedisClientPipeline
}

export class HogWatcher {
constructor(private hub: Hub, private onStateChange: (id: HogFunctionType['id'], state: HogWatcherState) => void) {}
constructor(
private hub: Hub,
private redis: CdpRedis,
private onStateChange: (id: HogFunctionType['id'], state: HogWatcherState) => void
) {}

private rateLimitArgs(id: HogFunctionType['id'], cost: number) {
const nowSeconds = Math.round(now() / 1000)
@@ -105,32 +41,6 @@ export class HogWatcher {
] as const
}

private async runRedis<T>(fn: (client: HogWatcherRedisClient) => Promise<T>): Promise<T | null> {
// We want all of this to fail open in the issue of redis being unavailable - we'd rather have the function continue
const client = await this.hub.redisPool.acquire()

client.defineCommand('checkRateLimit', {
numberOfKeys: 1,
lua: LUA_TOKEN_BUCKET,
})

const timeout = timeoutGuard(
`Redis call delayed. Waiting over ${REDIS_TIMEOUT_SECONDS} seconds.`,
undefined,
REDIS_TIMEOUT_SECONDS * 1000
)
try {
return await fn(client as HogWatcherRedisClient)
} catch (e) {
status.error('HogWatcher Redis error', e)
captureException(e)
return null
} finally {
clearTimeout(timeout)
await this.hub.redisPool.release(client)
}
}

public tokensToFunctionState(tokens?: number | null, stateOverride?: HogWatcherState): HogWatcherFunctionState {
tokens = tokens ?? this.hub.CDP_WATCHER_BUCKET_SIZE
const rating = tokens / this.hub.CDP_WATCHER_BUCKET_SIZE
@@ -151,16 +61,12 @@ export class HogWatcher {
): Promise<Record<HogFunctionType['id'], HogWatcherFunctionState>> {
const idsSet = new Set(ids)

const res = await this.runRedis(async (client) => {
const pipeline = client.pipeline()

const res = await this.redis.usePipeline({ name: 'getStates' }, (pipeline) => {
for (const id of idsSet) {
pipeline.checkRateLimit(...this.rateLimitArgs(id, 0))
pipeline.get(`${REDIS_KEY_DISABLED}/${id}`)
pipeline.ttl(`${REDIS_KEY_DISABLED}/${id}`)
}

return pipeline.exec()
})

return Array.from(idsSet).reduce((acc, id, index) => {
@@ -189,9 +95,7 @@ export class HogWatcher {
}

public async forceStateChange(id: HogFunctionType['id'], state: HogWatcherState): Promise<void> {
await this.runRedis(async (client) => {
const pipeline = client.pipeline()

await this.redis.usePipeline({ name: 'forceStateChange' }, (pipeline) => {
const newScore =
state === HogWatcherState.healthy
? this.hub.CDP_WATCHER_BUCKET_SIZE
@@ -211,8 +115,6 @@ export class HogWatcher {
} else {
pipeline.del(`${REDIS_KEY_DISABLED}/${id}`)
}

await pipeline.exec()
})

this.onStateChange(id, state)
@@ -243,14 +145,10 @@ export class HogWatcher {
costs[result.invocation.hogFunctionId] = cost
})

const res = await this.runRedis(async (client) => {
const pipeline = client.pipeline()

const res = await this.redis.usePipeline({ name: 'checkRateLimits' }, (pipeline) => {
Object.entries(costs).forEach(([id, change]) => {
pipeline.checkRateLimit(...this.rateLimitArgs(id, change))
})

return await pipeline.exec()
})

// TRICKY: the above part is straight forward - below is more complex as we do multiple calls to ensure
@@ -264,9 +162,7 @@ export class HogWatcher {
if (disabledFunctionIds.length) {
// Mark them all as disabled in redis

const results = await this.runRedis(async (client) => {
const pipeline = client.pipeline()

const results = await this.redis.usePipeline({ name: 'markDisabled' }, (pipeline) => {
disabledFunctionIds.forEach((id) => {
pipeline.set(
`${REDIS_KEY_DISABLED}/${id}`,
@@ -276,8 +172,6 @@ export class HogWatcher {
'NX'
)
})

return pipeline.exec()
})

const functionsTempDisabled = disabledFunctionIds.filter((_, index) =>
@@ -289,16 +183,13 @@ export class HogWatcher {
}

// We store the history as a zset - we can then use it to determine if we should disable indefinitely
const historyResults = await this.runRedis(async (client) => {
const pipeline = client.pipeline()
const historyResults = await this.redis.usePipeline({ name: 'addTempDisabled' }, (pipeline) => {
functionsTempDisabled.forEach((id) => {
const key = `${REDIS_KEY_DISABLED_HISTORY}/${id}`
pipeline.zadd(key, now(), new UUIDT().toString())
pipeline.zrange(key, 0, -1)
pipeline.expire(key, this.hub.CDP_WATCHER_TTL)
})

return await pipeline.exec()
})

const functionsToDisablePermanently = functionsTempDisabled.filter((_, index) => {
@@ -307,15 +198,12 @@ export class HogWatcher {
})

if (functionsToDisablePermanently.length) {
await this.runRedis(async (client) => {
const pipeline = client.pipeline()
await this.redis.usePipeline({ name: 'disablePermanently' }, (pipeline) => {
functionsToDisablePermanently.forEach((id) => {
const key = `${REDIS_KEY_DISABLED}/${id}`
pipeline.set(key, '1')
pipeline.del(`${REDIS_KEY_DISABLED_HISTORY}/${id}`)
})

return await pipeline.exec()
})
}

152 changes: 152 additions & 0 deletions plugin-server/src/cdp/redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// NOTE: PostIngestionEvent is our context event - it should never be sent directly to an output, but rather transformed into a lightweight schema

import { captureException } from '@sentry/node'
import { createPool } from 'generic-pool'
import { Pipeline, Redis } from 'ioredis'

import { PluginsServerConfig } from '../types'
import { timeoutGuard } from '../utils/db/utils'
import { status } from '../utils/status'
import { createRedisClient } from '../utils/utils'

type WithCheckRateLimit<T> = {
checkRateLimit: (key: string, now: number, cost: number, poolMax: number, fillRate: number, expiry: number) => T
}

export type CdpRedisClientPipeline = Pipeline & WithCheckRateLimit<number>

export type CdpRedisClient = Omit<Redis, 'pipeline'> &
WithCheckRateLimit<Promise<number>> & {
pipeline: () => CdpRedisClientPipeline
}

export type CdpRedisOptions = {
name: string
timeout?: number
failOpen?: boolean
}

export type CdpRedis = {
useClient: <T>(options: CdpRedisOptions, callback: (client: CdpRedisClient) => Promise<T>) => Promise<T | null>
usePipeline: (
options: CdpRedisOptions,
callback: (pipeline: CdpRedisClientPipeline) => void
) => Promise<Array<[Error | null, any]> | null>
}

// NOTE: We ideally would have this in a file but the current build step doesn't handle anything other than .ts files
const LUA_TOKEN_BUCKET = `
local key = KEYS[1]
local now = ARGV[1]
local cost = ARGV[2]
local poolMax = ARGV[3]
local fillRate = ARGV[4]
local expiry = ARGV[5]
local before = redis.call('hget', key, 'ts')
-- If we don't have a timestamp then we set it to now and fill up the bucket
if before == false then
local ret = poolMax - cost
redis.call('hset', key, 'ts', now)
redis.call('hset', key, 'pool', ret)
redis.call('expire', key, expiry)
return ret
end
-- We update the timestamp if it has changed
local timeDiffSeconds = now - before
if timeDiffSeconds > 0 then
redis.call('hset', key, 'ts', now)
else
timeDiffSeconds = 0
end
-- Calculate how much should be refilled in the bucket and add it
local owedTokens = timeDiffSeconds * fillRate
local currentTokens = redis.call('hget', key, 'pool')
if currentTokens == false then
currentTokens = poolMax
end
currentTokens = math.min(currentTokens + owedTokens, poolMax)
-- Remove the cost and return the new number of tokens
if currentTokens - cost >= 0 then
currentTokens = currentTokens - cost
else
currentTokens = -1
end
redis.call('hset', key, 'pool', currentTokens)
redis.call('expire', key, expiry)
-- Finally return the value - if it's negative then we've hit the limit
return currentTokens
`

export const createCdpRedisPool = (config: PluginsServerConfig): CdpRedis => {
const pool = createPool<CdpRedisClient>(
{
create: async () => {
const client = await createRedisClient(config.CDP_REDIS_HOST, {
port: config.CDP_REDIS_PORT,
password: config.CDP_REDIS_PASSWORD,
})

client.defineCommand('checkRateLimit', {
numberOfKeys: 1,
lua: LUA_TOKEN_BUCKET,
})

return client as CdpRedisClient
},
destroy: async (client) => {
await client.quit()
},
},
{
min: config.REDIS_POOL_MIN_SIZE,
max: config.REDIS_POOL_MAX_SIZE,
autostart: true,
}
)

const useClient: CdpRedis['useClient'] = async (options, callback) => {
const timeout = timeoutGuard(
`Redis call ${options.name} delayed. Waiting over 30 seconds.`,
undefined,
options.timeout
)
const client = await pool.acquire()

try {
return await callback(client)
} catch (e) {
if (options.failOpen) {
// We log the error and return null
captureException(e)
status.error(`Redis call${options.name} failed`, e)
return null
}
throw e
} finally {
await pool.release(client)
clearTimeout(timeout)
}
}

const usePipeline: CdpRedis['usePipeline'] = async (options, callback) => {
return useClient(options, async (client) => {
const pipeline = client.pipeline()
callback(pipeline)
return pipeline.exec()
})
}

return {
useClient,
usePipeline,
}
}
10 changes: 8 additions & 2 deletions plugin-server/src/cdp/types.ts
Original file line number Diff line number Diff line change
@@ -32,12 +32,17 @@ export interface HogFunctionFilterAction extends HogFunctionFilterBase {

export type HogFunctionFilter = HogFunctionFilterEvent | HogFunctionFilterAction

export type HogFunctionFiltersMasking = {
ttl: number | null
hash: string
bytecode: HogBytecode
threshold: number | null
}

export interface HogFunctionFilters {
events?: HogFunctionFilterEvent[]
actions?: HogFunctionFilterAction[]
filter_test_accounts?: boolean
// Loaded at run time from Team model
filter_test_accounts_bytecode?: boolean
bytecode?: HogBytecode
}

@@ -231,6 +236,7 @@ export type HogFunctionType = {
inputs_schema?: HogFunctionInputSchemaType[]
inputs?: Record<string, HogFunctionInputType>
filters?: HogFunctionFilters | null
masking?: HogFunctionFiltersMasking | null
depends_on_integration_ids?: Set<IntegrationType['id']>
}

3 changes: 3 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
@@ -187,6 +187,9 @@ export function getDefaultConfig(): PluginsServerConfig {
CDP_WATCHER_REFILL_RATE: 10,
CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: 3,
CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: '',
CDP_REDIS_PASSWORD: '',
CDP_REDIS_HOST: '',
CDP_REDIS_PORT: 6479,
}
}

5 changes: 4 additions & 1 deletion plugin-server/src/types.ts
Original file line number Diff line number Diff line change
@@ -107,6 +107,9 @@ export type CdpConfig = {
CDP_WATCHER_DISABLED_TEMPORARY_TTL: number // How long a function should be temporarily disabled for
CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: number // How many times a function can be disabled before it is disabled permanently
CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: string
CDP_REDIS_HOST: string
CDP_REDIS_PORT: number
CDP_REDIS_PASSWORD: string
}

export interface PluginsServerConfig extends CdpConfig {
@@ -1235,6 +1238,6 @@ export type AppMetric2Type = {
app_source_id: string
instance_id?: string
metric_kind: 'failure' | 'success' | 'other'
metric_name: 'succeeded' | 'failed' | 'filtered' | 'disabled_temporarily' | 'disabled_permanently'
metric_name: 'succeeded' | 'failed' | 'filtered' | 'disabled_temporarily' | 'disabled_permanently' | 'masked'
count: number
}
19 changes: 19 additions & 0 deletions plugin-server/tests/cdp/examples.ts
Original file line number Diff line number Diff line change
@@ -415,3 +415,22 @@ export const HOG_FILTERS_EXAMPLES: Record<string, Pick<HogFunctionType, 'filters
},
},
}

export const HOG_MASK_EXAMPLES: Record<string, Pick<HogFunctionType, 'masking'>> = {
all: {
masking: {
ttl: 30,
hash: 'all',
bytecode: ['_h', 32, 'all'],
threshold: null,
},
},
person: {
masking: {
ttl: 30,
hash: '{person.uuid}',
bytecode: ['_h', 32, 'uuid', 32, 'person', 1, 2],
threshold: null,
},
},
}
12 changes: 12 additions & 0 deletions plugin-server/tests/cdp/helpers/redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { CdpRedis } from '../../../src/cdp/redis'

export async function deleteKeysWithPrefix(redis: CdpRedis, prefix: string) {
await redis.useClient({ name: 'delete-keys' }, async (client) => {
const keys = await client.keys(`${prefix}*`)
const pipeline = client.pipeline()
keys.forEach(function (key) {
pipeline.del(key)
})
await pipeline.exec()
})
}
1 change: 1 addition & 0 deletions plugin-server/tests/cdp/hog-function-manager.test.ts
Original file line number Diff line number Diff line change
@@ -113,6 +113,7 @@ describe('HogFunctionManager', () => {
value: integrations[0].id,
},
},
masking: null,
depends_on_integration_ids: new Set([integrations[0].id]),
},
])
208 changes: 208 additions & 0 deletions plugin-server/tests/cdp/hog-masker.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
jest.mock('../../src/utils/now', () => {
return {
now: jest.fn(() => Date.now()),
}
})
import { BASE_REDIS_KEY, HogMasker } from '../../src/cdp/hog-masker'
import { CdpRedis, createCdpRedisPool } from '../../src/cdp/redis'
import { HogFunctionType } from '../../src/cdp/types'
import { Hub } from '../../src/types'
import { createHub } from '../../src/utils/db/hub'
import { delay } from '../../src/utils/utils'
import { HOG_MASK_EXAMPLES } from './examples'
import { createHogExecutionGlobals, createHogFunction } from './fixtures'
import { deleteKeysWithPrefix } from './helpers/redis'

const mockNow: jest.Mock = require('../../src/utils/now').now as any

describe('HogMasker', () => {
describe('integration', () => {
let now: number
let hub: Hub
let closeHub: () => Promise<void>
let masker: HogMasker
let redis: CdpRedis

beforeEach(async () => {
;[hub, closeHub] = await createHub()

now = 1720000000000
mockNow.mockReturnValue(now)

redis = createCdpRedisPool(hub)
await deleteKeysWithPrefix(redis, BASE_REDIS_KEY)

masker = new HogMasker(redis)
})

const advanceTime = (ms: number) => {
now += ms
mockNow.mockReturnValue(now)
}

const reallyAdvanceTime = async (ms: number) => {
advanceTime(ms)
await delay(ms)
}

afterEach(async () => {
await closeHub()
jest.clearAllMocks()
})

it('should return all functions without masks', async () => {
const normalFunction = createHogFunction({})
const invocations = [{ globals: createHogExecutionGlobals(), hogFunction: normalFunction }]
const res = await masker.filterByMasking(invocations)

expect(res.notMasked).toHaveLength(1)
expect(res.masked).toEqual([])
})

it('should only allow one invocation call when masked for one function', async () => {
const functionWithAllMasking = createHogFunction({
...HOG_MASK_EXAMPLES.all,
})
const globals1 = createHogExecutionGlobals({ event: { uuid: '1' } as any })
const globals2 = createHogExecutionGlobals({ event: { uuid: '2' } as any })
const globals3 = createHogExecutionGlobals({ event: { uuid: '3' } as any })
const invocations = [
{ globals: globals1, hogFunction: functionWithAllMasking },
{ globals: globals2, hogFunction: functionWithAllMasking },
{ globals: globals3, hogFunction: functionWithAllMasking },
]

const res = await masker.filterByMasking(invocations)
expect(res.notMasked).toHaveLength(1)
expect(res.masked).toHaveLength(2)
expect(res.notMasked[0].globals).toEqual(globals1)
expect(res.masked[0].globals).toEqual(globals2)
expect(res.masked[1].globals).toEqual(globals3)

const res2 = await masker.filterByMasking(invocations)
expect(res2.notMasked).toHaveLength(0)
expect(res2.masked).toHaveLength(3)
})

it('allow multiple functions for the same globals', async () => {
const functionWithAllMasking = createHogFunction({
...HOG_MASK_EXAMPLES.all,
})
const functionWithAllMasking2 = createHogFunction({
...HOG_MASK_EXAMPLES.all,
})
const functionWithNoMasking = createHogFunction({})
const globals = createHogExecutionGlobals()
const invocations = [
{ globals, hogFunction: functionWithAllMasking },
{ globals, hogFunction: functionWithAllMasking2 },
{ globals, hogFunction: functionWithNoMasking },
]

const res = await masker.filterByMasking(invocations)
expect(res.notMasked).toHaveLength(3)
expect(res.masked).toHaveLength(0)

const res2 = await masker.filterByMasking(invocations)
expect(res2.notMasked).toHaveLength(1)
expect(res2.masked).toHaveLength(2)
expect(res2.notMasked[0].hogFunction).toEqual(functionWithNoMasking)
expect(res2.masked[0].hogFunction).toEqual(functionWithAllMasking)
expect(res2.masked[1].hogFunction).toEqual(functionWithAllMasking2)
})

describe('ttl', () => {
let hogFunctionPerson: HogFunctionType
let hogFunctionAll: HogFunctionType

beforeEach(() => {
hogFunctionPerson = createHogFunction({
masking: {
...HOG_MASK_EXAMPLES.person.masking!,
ttl: 1,
},
})

hogFunctionAll = createHogFunction({
masking: {
...HOG_MASK_EXAMPLES.all.masking!,
ttl: 1,
},
})
})
it('should re-allow after the ttl expires', async () => {
const invocations = [{ globals: createHogExecutionGlobals(), hogFunction: hogFunctionAll }]
expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(1)
expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0)
expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0)
await reallyAdvanceTime(1000)
expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(1)
expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0)
})

it('should mask with custom hog hash', async () => {
const globalsPerson1 = createHogExecutionGlobals({ person: { uuid: '1' } as any })
const globalsPerson2 = createHogExecutionGlobals({ person: { uuid: '2' } as any })

const invocations = [
{ globals: globalsPerson1, hogFunction: hogFunctionPerson },
{ globals: globalsPerson1, hogFunction: hogFunctionAll },
{ globals: globalsPerson2, hogFunction: hogFunctionPerson },
{ globals: globalsPerson2, hogFunction: hogFunctionAll },
]
const res = await masker.filterByMasking(invocations)
expect(res.masked.length).toEqual(1)
expect(res.notMasked.length).toEqual(3)
const res2 = await masker.filterByMasking(invocations)
expect(res2.masked.length).toEqual(4)
expect(res2.notMasked.length).toEqual(0)
})

it('should mask until threshold passed', async () => {
hogFunctionAll.masking!.threshold = 5

const invocations = [{ globals: createHogExecutionGlobals(), hogFunction: hogFunctionAll }]
// First one goes through
expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(1)

// Next 4 should be masked
expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0)
expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0)
expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0)
expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0)
// Now we have hit the threshold so it should not be masked
expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(1)
// Next 4 should be masked
expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0)
expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0)
expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0)
expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0)
// Again the Nth one shouldn't be masked
expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(1)
})

it('should mask threshold based in a batch', async () => {
hogFunctionAll.masking!.threshold = 5
hogFunctionAll.masking!.ttl = 10

// If we have 10 invocations in a batch then we should have 2 invocations that are not masked
expect(
(
await masker.filterByMasking(
Array(10).fill({ globals: createHogExecutionGlobals(), hogFunction: hogFunctionAll })
)
).notMasked
).toHaveLength(2)

// Next one should cross the threshold
expect(
(
await masker.filterByMasking([
{ globals: createHogExecutionGlobals(), hogFunction: hogFunctionAll },
])
).notMasked
).toHaveLength(1)
})
})
})
})
19 changes: 11 additions & 8 deletions plugin-server/tests/cdp/hog-watcher.test.ts
Original file line number Diff line number Diff line change
@@ -4,11 +4,12 @@ jest.mock('../../src/utils/now', () => {
}
})
import { BASE_REDIS_KEY, HogWatcher, HogWatcherState } from '../../src/cdp/hog-watcher'
import { CdpRedis, createCdpRedisPool } from '../../src/cdp/redis'
import { HogFunctionInvocationResult } from '../../src/cdp/types'
import { Hub } from '../../src/types'
import { createHub } from '../../src/utils/db/hub'
import { delay } from '../../src/utils/utils'
import { deleteKeysWithPrefix } from '../helpers/redis'
import { deleteKeysWithPrefix } from './helpers/redis'

const mockNow: jest.Mock = require('../../src/utils/now').now as any

@@ -44,6 +45,7 @@ describe('HogWatcher', () => {
let closeHub: () => Promise<void>
let watcher: HogWatcher
let mockStateChangeCallback: jest.Mock
let redis: CdpRedis

beforeEach(async () => {
;[hub, closeHub] = await createHub()
@@ -52,16 +54,22 @@ describe('HogWatcher', () => {
mockNow.mockReturnValue(now)
mockStateChangeCallback = jest.fn()

await deleteKeysWithPrefix(hub.redisPool, BASE_REDIS_KEY)
redis = createCdpRedisPool(hub)
await deleteKeysWithPrefix(redis, BASE_REDIS_KEY)

watcher = new HogWatcher(hub, mockStateChangeCallback)
watcher = new HogWatcher(hub, redis, mockStateChangeCallback)
})

const advanceTime = (ms: number) => {
now += ms
mockNow.mockReturnValue(now)
}

const reallyAdvanceTime = async (ms: number) => {
advanceTime(ms)
await delay(ms)
}

afterEach(async () => {
jest.useRealTimers()
await closeHub()
@@ -253,11 +261,6 @@ describe('HogWatcher', () => {
hub.CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT = 3
})

const reallyAdvanceTime = async (ms: number) => {
advanceTime(ms)
await delay(ms)
}

it('count the number of times it has been disabled', async () => {
// Trigger the temporary disabled state 3 times
for (let i = 0; i < 2; i++) {
20 changes: 18 additions & 2 deletions posthog/api/hog_function.py
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@

from posthog.cdp.services.icons import CDPIconsService
from posthog.cdp.templates import HOG_FUNCTION_TEMPLATES_BY_ID
from posthog.cdp.validation import compile_hog, validate_inputs, validate_inputs_schema
from posthog.cdp.validation import compile_hog, generate_template_bytecode, validate_inputs, validate_inputs_schema
from posthog.constants import AvailableFeature
from posthog.models.hog_functions.hog_function import HogFunction, HogFunctionState
from posthog.permissions import PostHogFeatureFlagPermission
@@ -56,9 +56,24 @@ class Meta:
read_only_fields = fields


class HogFunctionMaskingSerializer(serializers.Serializer):
ttl = serializers.IntegerField(
required=True, min_value=60, max_value=60 * 60 * 24
) # NOTE: 24 hours max for now - we might increase this later
threshold = serializers.IntegerField(required=False, allow_null=True)
hash = serializers.CharField(required=True)
bytecode = serializers.JSONField(required=False, allow_null=True)

def validate(self, attrs):
attrs["bytecode"] = generate_template_bytecode(attrs["hash"])

return super().validate(attrs)


class HogFunctionSerializer(HogFunctionMinimalSerializer):
template = HogFunctionTemplateSerializer(read_only=True)
status = HogFunctionStatusSerializer(read_only=True, required=False, allow_null=True)
masking = HogFunctionMaskingSerializer(required=False, allow_null=True)

class Meta:
model = HogFunction
@@ -76,6 +91,7 @@ class Meta:
"inputs_schema",
"inputs",
"filters",
"masking",
"icon_url",
"template",
"template_id",
@@ -156,7 +172,7 @@ def validate(self, attrs):
if "hog" in attrs:
attrs["bytecode"] = compile_hog(attrs["hog"])

return attrs
return super().validate(attrs)

def to_representation(self, data):
data = super().to_representation(data)
4 changes: 4 additions & 0 deletions posthog/api/test/__snapshots__/test_decide.ambr
Original file line number Diff line number Diff line change
@@ -264,6 +264,7 @@
"posthog_hogfunction"."inputs_schema",
"posthog_hogfunction"."inputs",
"posthog_hogfunction"."filters",
"posthog_hogfunction"."masking",
"posthog_hogfunction"."template_id",
"posthog_team"."id",
"posthog_team"."uuid",
@@ -384,6 +385,7 @@
"posthog_hogfunction"."inputs_schema",
"posthog_hogfunction"."inputs",
"posthog_hogfunction"."filters",
"posthog_hogfunction"."masking",
"posthog_hogfunction"."template_id",
"posthog_team"."id",
"posthog_team"."uuid",
@@ -630,6 +632,7 @@
"posthog_hogfunction"."inputs_schema",
"posthog_hogfunction"."inputs",
"posthog_hogfunction"."filters",
"posthog_hogfunction"."masking",
"posthog_hogfunction"."template_id",
"posthog_team"."id",
"posthog_team"."uuid",
@@ -995,6 +998,7 @@
"posthog_hogfunction"."inputs_schema",
"posthog_hogfunction"."inputs",
"posthog_hogfunction"."filters",
"posthog_hogfunction"."masking",
"posthog_hogfunction"."template_id",
"posthog_team"."id",
"posthog_team"."uuid",
21 changes: 21 additions & 0 deletions posthog/api/test/test_hog_function.py
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@
from typing import Any, Optional
from unittest.mock import ANY, patch

from inline_snapshot import snapshot
from rest_framework import status

from posthog.constants import AvailableFeature
@@ -167,6 +168,7 @@ def test_create_hog_function(self, *args):
"filters": {"bytecode": ["_h", 29]},
"icon_url": None,
"template": None,
"masking": None,
"status": {"rating": 0, "state": 0, "tokens": 0},
}

@@ -480,6 +482,25 @@ def test_generates_filters_bytecode(self, *args):
],
}

@patch("posthog.permissions.posthoganalytics.feature_enabled", return_value=True)
def test_saves_masking_config(self, *args):
response = self.client.post(
f"/api/projects/{self.team.id}/hog_functions/",
data={
**EXAMPLE_FULL,
"masking": {"ttl": 60, "threshold": 20, "hash": "{person.properties.email}"},
},
)
assert response.status_code == status.HTTP_201_CREATED, response.json()
assert response.json()["masking"] == snapshot(
{
"ttl": 60,
"threshold": 20,
"hash": "{person.properties.email}",
"bytecode": ["_h", 32, "email", 32, "properties", 32, "person", 1, 3],
}
)

@patch("posthog.permissions.posthoganalytics.feature_enabled", return_value=True)
def test_loads_status_when_enabled_and_available(self, *args):
with patch("posthog.plugins.plugin_server_api.requests.get") as mock_get:
17 changes: 17 additions & 0 deletions posthog/migrations/0456_hogfunction_masking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Generated by Django 4.2.14 on 2024-08-09 09:44

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("posthog", "0455_alter_externaldatasource_source_type"),
]

operations = [
migrations.AddField(
model_name="hogfunction",
name="masking",
field=models.JSONField(blank=True, null=True),
),
]
1 change: 1 addition & 0 deletions posthog/models/hog_functions/hog_function.py
Original file line number Diff line number Diff line change
@@ -46,6 +46,7 @@ class HogFunction(UUIDModel):
inputs_schema: models.JSONField = models.JSONField(null=True)
inputs: models.JSONField = models.JSONField(null=True)
filters: models.JSONField = models.JSONField(null=True, blank=True)
masking: models.JSONField = models.JSONField(null=True, blank=True)
template_id: models.CharField = models.CharField(max_length=400, null=True, blank=True)

@property

0 comments on commit 524ebff

Please sign in to comment.