[root@WH0PRDBRP00AP0013 ogg]# cd AdapterExamples/big-data/kafka/[root@WH0PRDBRP00AP0013 kafka]# lscustom_kafka_producer.properties kafka.props rkafka.prm
bootstrap.servers=localhost:9092 //kafka服务器的地址acks=1compression.type=gzipreconnect.backoff.ms=1000value.serializer=org.apache.kafka.common.serialization.ByteArraySerializerkey.serializer=org.apache.kafka.common.serialization.ByteArraySerializer# 100KB per partitionbatch.size=102400linger.ms=10000
gg.handlerlist = kafkahandlergg.handler.kafkahandler.type = kafkagg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.propertiesgg.handler.kafkahandler.TopicName =oggtopicgg.handler.kafkahandler.format =avro_op #有多种模式可以选xml,delimitedtext,json,avro_row,avro_opgg.handler.kafkahandler.SchemaTopicName=mySchemaTopicgg.handler.kafkahandler.BlockingSend =false #阻塞模式gg.handler.kafkahandler.includeTokens=falsegg.handler.kafkahandler.mode =tx #可选OP/TX,OP每次ORACLE的操作(I D U)都会当作生产者消费者的记录刷新一次,TX按事务提交刷新#gg.handler.kafkahandler.maxGroupSize =100, 1Mb#gg.handler.kafkahandler.minGroupSize =50, 500Kbgoldengate.userexit.timestamp=utcgoldengate.userexit.writers=javawriterjavawriter.stats.display=TRUEjavawriter.stats.full=TRUEgg.log=log4jgg.log.level=INFOgg.report.time=30secgg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/*:javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
REPLICAT rkafkaTARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.propsREPORTCOUNT EVERY 1 MINUTES, RATEGROUPTRANSOPS 10000MAP XDGL.BUSINESS_CONTRACT , TARGET XDGL.BUSINESS_CONTRACT;
原文:http://www.cnblogs.com/skyrim/p/7456187.html