博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Drools stream integration
阅读量:7006 次
发布时间:2019-06-27

本文共 9584 字,大约阅读时间需要 31 分钟。

This passage discusses how to integrate a provided drools package into datastream application.

Packaging:

If a maven project is provided by customer. In this case, you need to ensure that the pom file contains the following:

org.drools
drools-bom
pom
xxx
import
org.kie
kie-api
org.drools
drools-compiler
runtime
other dependencies
org.kie
kie-maven-plugin
xxx
true

In addition, a file kmodule.xml must be added to src\main\resources\META-INF folder. A minimum kmodule.xml likes like the following.

The default stateless ksession is mandatory.

Rule files can be put in main/resources as normal

The command to create jar file is still mvn package as normal. However, the jar created is a bit different. Here is a screenshot

Drools stream integration

Note that there is knowledge base cache file and kmodule file in META-INF. Two rule files in main/resources are shifted out into the root folder.

What if the customer does not provide a maven project? I guess the best strategy is to create a maven project by ourselves. If source code is provided, we just import source code into the maven project, otherwise, use customer provided jar as a maven dependency?

Note that kie module is introduced only after drools 6. So I don't think this will work for drools 5 and below. Also, for drools integration in streamtau, we are using the latest version 7.2.1. So whether earlier version like 6.x is fully compatible still remains a question.

Invocation:

Load rules:
First create a KieServices singleton instance.
private final KieServices kieServices = KieServices.Factory.get();

Load the drools package into system:

protected DroolsDataHolder doLoadDroolsModule(DroolsLoadParam droolsLoadParam) {        DroolsParameters origParams = droolsLoadParam.getDroolsParam();        String moduleName = origParams.getModuleName();        try {            InputStream is = droolsDataLoader.getDroolsModuleAsStream(droolsLoadParam);            KieContainer curContainer = DroolsUtils.buildContainer(kieServices, is);            return new DroolsDataHolder(curContainer);        }        catch (Exception ex) {            logger.error("Error loading drools " + moduleName, ex);        }        return null;    }

DroolsDataLoader is an interface that is designed to loads drools package as stream (via either file system or restful interface)

DroolsUtils is the utility class that builds a KieContainer from stream.

public static KieContainer buildContainer(KieServices kieServices, InputStream stream) throws Exception {        Resource wrapped = kieServices.getResources().newInputStreamResource(stream);        KieModule curModule = kieServices.getRepository().addKieModule(wrapped);        ReleaseId releaseId = curModule.getReleaseId();        logger.info("Release id generated for module: {}", releaseId);        KieContainer kContainer = kieServices.newKieContainer(releaseId, DroolsUtils.class.getClassLoader());        return kContainer;    }

The returned DroolsDataHolder is merely a wrapper of KieContainer

public class DroolsDataHolder {    private final KieContainer kieContainer;    public DroolsDataHolder(KieContainer kieContainer) {        this.kieContainer = kieContainer;    }    public KieContainer getKieContainer() {        return kieContainer;    }    public void destroy() {        kieContainer.dispose();    }}

The loaded DroolsDataHolder will be cached unless rule is changed, which triggers a reload operation

public DroolsDataHolder getOrLoadDroolsModule(DroolsLoadParam droolsLoadParam) {        DroolsParameters origParams = droolsLoadParam.getDroolsParam();        String moduleName = origParams.getModuleName();        dataLock.readLock().lock();        try {            DroolsDataHolder curHolder = containers.get(moduleName);            if (curHolder != null) {                return curHolder;            }            dataLock.readLock().unlock();            dataLock.writeLock().lock();            try {                return doUpdateDroolsModule(droolsLoadParam);            }            finally {                dataLock.readLock().lock();                dataLock.writeLock().unlock();            }        }        finally {            dataLock.readLock().unlock();        }    }

Invoke the drools module:

In stream environment, only stateless drools knowledge session is supported for now. The main reason is that stream is executed in a distributed environment. The session will be created on multiple JVMS, so it is virtually hard to share all the facts globally. Evaluating the rule is quite simple, it is composed of 3 steps:

  1. convert stream data to rule input pojo
    public Class
    getRulePojoClass(DroolsLoadParam droolsLoadParam, String inputPojoClassName) { DroolsParameters origParams = droolsLoadParam.getDroolsParam(); String moduleName = origParams.getModuleName(); DroolsDataHolder curDataHolder = this.getOrLoadDroolsModule(droolsLoadParam); if (curDataHolder == null) { throw new IllegalArgumentException("No drools module found by name: " + moduleName); } try { ClassLoader cl = curDataHolder.getKieContainer().getClassLoader(); Class
    inputPojoClass = cl.loadClass(inputPojoClassName); return inputPojoClass; } catch (Exception e) { throw RtException.from(e); }}

The good thing about drools module is that it provides a self contained class loading environment. So third party jar dependencies are unlikely to cause conflict with the outside runtime environment. However, when we build an input event to drools engine, we need to use the KieContainer's class loader to find the input event class referenced in rule.

  1. build a stateless kie session and invoke the rule

    public List evaluate(DroolsLoadParam droolsLoadParam, List facts) {    if (logger.isDebugEnabled()) {        logger.debug("Start evaluating drools, input is: {}, module name is: {}", Arrays.asList(facts),                droolsLoadParam.getDroolsParam().getModuleName());    }    DroolsParameters origParams = droolsLoadParam.getDroolsParam();    String moduleName = origParams.getModuleName();    DroolsDataHolder curDataHolder = this.getOrLoadDroolsModule(droolsLoadParam);    if (curDataHolder == null) {        throw new IllegalArgumentException("No drools module found by name: " + moduleName);    }    StatelessKieSession curSession = curDataHolder.getKieContainer().newStatelessKieSession();    curSession.execute(facts);    return facts;}
  2. convert rule evaluation result back to stream data

Under the hood:

Drools class relations
Drools stream integration

Drools package loading

Drools stream integration

Things to note:

Drools package can be large and the current approach caches all loaded drools package in memory. The loading time and memory consumption might be a bottleneck of scalability. A better approach will be building a standalone rule server, where it manages rules and exposes a rest api to stream application.

Find out input metadata for rule: it is possible to find out java class of each rule variable. This is useful as a hint to map stream data to rule input.

public static Map
> getRuleInputMeta(KieBase kieBase, String rulePkgName, String ruleName) { RuleImpl r = (RuleImpl)kieBase.getRule(rulePkgName, ruleName); List
elements = r.getLhs().getChildren(); Pattern curPattern = null; String curId = null; ObjectType curObjType = null; Map
> result = new HashMap
>(); for (RuleConditionElement nextElem : elements) { if (nextElem instanceof Pattern) { curPattern = (Pattern)nextElem; curObjType = curPattern.getObjectType(); curId = curPattern.getDeclaration().getIdentifier(); result.put(curId, curObjType.getValueType().getClassType()); } } return result; }

Maven shade plugin and drools jar:

To use the drools java api, multiple jars need to be included as maven dependency.

However, the special thing about drools jars is that each one contains a file kie.conf (Eg. drools-core.jar, kie-internal.jar). The default behavior of maven shade plugin is that kie.conf will overwrite each other and causes a runtime error when deploying the shaded jar to flink. Mitigation to this problem is to configure maven shadow plugin parameters properly so that the content of each kie.conf will be appended to the combined file instead of overwritten.

org.apache.maven.plugins
maven-shade-plugin
package
shade
META-INF/kie.conf

转载于:https://blog.51cto.com/shadowisper/2293807

你可能感兴趣的文章
【CentOS-7+ Ambari 2.7.0 + HDP 3.0+HAWQ2.3.00】遭遇问题及解决记录
查看>>
总结-jQuery
查看>>
Spring 声明式事务
查看>>
Eclipse中将含有图片资源的项目打包成jar文件
查看>>
【剑指offer12】矩阵中的路径(回朔法),C++实现
查看>>
Bzoj2342 [Shoi2011]双倍回文
查看>>
git-git使用
查看>>
Kubernetes安装
查看>>
MyBatis(1)优点&介绍&工程
查看>>
类UNIX操作系统概念
查看>>
常用Eclipse快捷方式
查看>>
树上各种DFS姿势算法笔记
查看>>
Windbg学习 (0x0008) 命令-符号文件/符号
查看>>
1、脱硫塔工作原理
查看>>
mysql存储过程变量的拼接
查看>>
laravel 加载指定版本的mongodb
查看>>
给pcm格式文件加wav文件头
查看>>
高精度模板(含加减乘除四则运算)
查看>>
[Swust OJ 797]--Palindromic Squares(回文数水题)
查看>>
【Java】提取JSON数值时遇到数组集合时使用的K-V方式转换
查看>>