Securing your Pulsar Cluster with Vault——Chris Kellogg


1.Securing Your Pulsar Cluster with Vault & @cckellogg #PulsarSummit

2.Chris Kellogg Software Engineer at Splunk Contributor to Apache Pulsar and Apache Heron committer You can find me on: cckellogg cckellogg

3.Agenda • Vault Overview • Why Pulsar and Vault • Pulsar Authentication/Authorization Model • Creating Custom Plugins • Packaging Custom Plugins • Kubernetes Integration • Demo

4.What is Vault? “Vault is a tool for securely accessing secrets. A secret is anything that you want to tightly control access to, such as API keys, passwords, or certificates. Vault provides a unified interface to any secret, while providing tight access control and recording a detailed audit log.”

5.Vault Features Secret Management Authentication and Identity Data Encryption

6.Why Vault • Single source to manage secrets and tokens • Dynamic and Revokable tokens and secrets • Audit tracking for secrets and token • Merges identities across providers - LDAP, Okta, Kubernetes, AWS, GCP • Cloud friendly

7.Why Pulsar and Vault • No more forever tokens • Revokable tokens • Secure secret management for functions and connectors • Supports authenticating against many trusted sources of identity - LDAP, Okta, Kubernetes, AWS, GCP, GitHub • Central location for all security

8.Pulsar Security

9.Default is No Security • Produce and consume from any topic • Modify any tenant, namespace, topic or function • Function/Connector secrets stored as plain text in configs • No auditing of actions

10.Pulsar Security Features • TLS Encryption for traffic • Authentication - validate identity • Authorization - can user perform an action • Data encryption between producers and consumers

11.Pulsar Authentication • Responsible for determining identity of clients • Plugin System • Built-in Plugins - TLS - JWT - Authenz - Kerberos

12.Pulsar Authorization • Determines if a client has permission to perform an action • Plugin System • Built-in Plugin - Role based system backed by Zookeeper - SuperUsers - Tenant Admins - Actions: produce/consume/functions

13.Developing Auth Plugin

14.Building Plugins Best Practices • Minimize third party dependencies • Use your own executor and threads for remote requests • Cache responses

15.Vault Authentication Plugin public class VaultAuthenticationProvider implements AuthenticationProvider { void initialize(ServiceConfiguration config) throws IOException {}; String getAuthMethodName() { return "token" }; boolean authenticateHttpRequest(HttpServletRequest req, HttpServletResponse resp) throws Exception { throw new AuthenticationException("Not supported"); } String authenticate(AuthenticationDataSource authData) throws AuthenticationException { // Implement code to authenticate the client with vault } AuthenticationState newAuthState(AuthData authData, SocketAddress remoteAddress, SSLSession sslSession) throws AuthenticationException { // Implement code to authenticate the client with vault - // Used in binary connections for challenges } }

16.Configuring Auth Plugin broker.conf ### --- Authentication --- ### # Enable authentication authenticationEnabled=true # Autentication provider name list, which is comma separated list of class names authenticationProviders=org.apache.pulsar.vault.authentication.VaultAuthentictionProvider # Interval of time for checking for expired authentication credentials authenticationRefreshCheckSeconds=60

17. Pulsar Vault Authentication BROKER 1 Authentication Client Service 6 5 2 3 Vault Authentication Provider 4 1. Client Request with Vault Token 3. Token pass to Vault for Authentication 5. Return user identity 2. Authenticate Client 4. Vault token info returned 6. Return result to client

18.Developing Function Plugins

19.Pulsar Secret Plugins Secrets Provider • Run in the instance • Provides secrets through the function context api Secrets Configurator • Runs on the server (Broker or Function Worker) • Determines the Secret Provider the instance should use

20.SecretsProvider - Client Side Plugin public interface SecretsProvider { // Initialize the SecretsProvider. default void init(Map<String, String> config) {} // Fetches a secret String provideSecret(String secretName, Object pathToSecret); } Example code public class MySecretFunction implements Function<String, Void> { @Override public Void process(String input, Context context) throws Exception { final String password = context.getSecret("password"); context.getLogger().info("read secret password=" + password); return null; } }

21.SecretsProviderConfigurator - Server Side Plugin public interface SecretsProviderConfigurator { default void init(Map<String, String> config) {} void configureKubernetesRuntimeSecretsProvider(V1PodSpec ps, String container, Function.FunctionDetails details; void configureProcessRuntimeSecretsProvider(ProcessBuilder pb, Function.FunctionDetails detailsetails); Type getSecretObjectType(); default void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String ns, Function.FunctionDetails details) {} String getSecretsProviderClassName(Function.FunctionDetails details); Map<String, String> getSecretsProviderConfig(Function.FunctionDetails details); } Highlighted methods are used to setup secrets plugins on the instances

22.Configuring Secret Plugins Secrets Configurator functions_worker.yml ######################## # Secrets ######################## secretsProviderConfiguratorClassName: org.apache.pulsar.vault.secrets.VaultSecretsProviderConfigurator secretsProviderConfiguratorConfig: vaultAddress: http://localhost:8200 tokenPath: /etc/auth/token

23.Configuring Secret Plugins Secrets Provider public class VaultSecretsProviderConfigurator implements SecretsProviderConfigurator { @Override public String getSecretsProviderClassName(Function.FunctionDetails details) { if (!isEmpty(functionDetails.getSecretsMap())) { if (Function.FunctionDetails.Runtime.JAVA == details.getRuntime()) { return "org.apache.pulsar.vault.secrets.VaultSecretsProvider"; } else if (Function.FunctionDetails.Runtime.PYTHON == details.getRuntime()) { return "python_secret_provider"; } } return null; } @Override public Map<String, String> getSecretsProviderConfig(Function.FunctionDetails details) { final Map<String, String> secrets = new HashMap<>(); secrets.put("vaultAddress", "http://localhost:8200"); secrets.put("tokenPath", "/var/auth/token"); return secrets; }

24.Vault Secret Provider Java Function Instance User Code 1. Request secret from code 3. Secret returned to plugin 2. Secret request with token 4. Return secret value final String password = context.getSecret("password"); 1 4 2 Vault Secret Provider 3

25.Pulsar Kubernetes Plugins Kubernetes Manifest Customizer • Runs on the server (Broker or Function Worker) • Enables customization to the K8s function specs Kubernetes Function Auth Provider • Runs on the server (Broker or Function Worker) • Determines the auth params passed to the instances

26.KubernetesManifestCustomizer - Server Side Plugin public interface KubernetesManifestCustomizer extends RuntimeCustomizer { default V1StatefulSet customizeStatefulSet(Function.FunctionDetails funcDetails, V1StatefulSet statefulSet) { return statefulSet; } default V1Service customizeService(Function.FunctionDetails funcDetails, V1Service service) { return service; } default String customizeNamespace(Function.FunctionDetails funcDetails, String currentNamespace) { return currentNamespace; } }

27.KubernetesFunctionAuthProvider - Server Side Plugin public interface KubernetesFunctionAuthProvider extends FunctionAuthProvider { public void configureAuthDataStatefulSet(V1StatefulSet sts, Optional<FunctionAuthData> o) {} public void configureAuthenticationConfig(AuthenticationConfig config, Optional<FunctionAuthData> o) { ** configures the client auth for the function instances } public Optional<FunctionAuthData> cacheAuthData(Function.FunctionDetails details, AuthenticationDataSource s) throws Exception { ** Optional<FunctionAuthData> returned is used in configureAuthenticationConfig } public Optional<FunctionAuthData> updateAuthData(Function.FunctionDetails details, Optional<FunctionAuthData> o, AuthenticationDataSource s) throws Exception { ** Optional<FunctionAuthData> returned is used in configureAuthenticationConfig } public void cleanUpAuthData(Function.FunctionDetails details, Optional<FunctionAuthData> o) throws Exception {} }

28.Vault Secret Provider with Vault Agent Java Function Instance User Code 1. Request secret from code 3. Return secret value 2. Read secret from file final String password = context.getSecret("password"); 1 3 Vault Secret Provider 2

29.Packaging Plugins Where do my plugins go? pulsar/ lib/ authentication.jar secret-configurator.jar kubernetes-plugins.jar instances/ deps/ secret-provider.jar

StreamNative 是一家围绕 Apache Pulsar 和 Apache BookKeeper 打造下一代流数据平台的开源基础软件公司。秉承 Event Streaming 是大数据的未来基石、开源是基础软件的未来这两个理念,专注于开源生态和社区的构建,致力于前沿技术。